matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27use matrix_sdk_common::timer;
28#[cfg(feature = "e2e-encryption")]
29use matrix_sdk_crypto::{
30    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
31    TrustRequirement, store::DynCryptoStore, types::requests::ToDeviceRequest,
32};
33#[cfg(doc)]
34use ruma::DeviceId;
35#[cfg(feature = "e2e-encryption")]
36use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
37use ruma::{
38    MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UserId,
39    api::client::{self as api, sync::sync_events::v5},
40    events::{
41        StateEvent, StateEventType,
42        ignored_user_list::IgnoredUserListEventContent,
43        push_rules::{PushRulesEvent, PushRulesEventContent},
44        room::member::SyncRoomMemberEvent,
45    },
46    push::Ruleset,
47    time::Instant,
48};
49use tokio::sync::{Mutex, broadcast};
50#[cfg(feature = "e2e-encryption")]
51use tokio::sync::{RwLock, RwLockReadGuard};
52use tracing::{Level, debug, enabled, info, instrument, warn};
53
54#[cfg(feature = "e2e-encryption")]
55use crate::RoomMemberships;
56use crate::{
57    InviteAcceptanceDetails, RoomStateFilter, SessionMeta,
58    deserialized_responses::DisplayName,
59    error::{Error, Result},
60    event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
61    media::store::MediaStoreLock,
62    response_processors::{self as processors, Context},
63    room::{
64        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
65    },
66    store::{
67        BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult, RoomLoadSettings,
68        StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig,
69        ambiguity_map::AmbiguityCache,
70    },
71    sync::{RoomUpdates, SyncResponse},
72};
73
74/// A no (network) IO client implementation.
75///
76/// This client is a state machine that receives responses and events and
77/// accordingly updates its state. It is not designed to be used directly, but
78/// rather through `matrix_sdk::Client`.
79///
80/// ```rust
81/// use matrix_sdk_base::{BaseClient, ThreadingSupport, store::StoreConfig};
82///
83/// let client = BaseClient::new(
84///     StoreConfig::new("cross-process-holder-name".to_owned()),
85///     ThreadingSupport::Disabled,
86/// );
87/// ```
88#[derive(Clone)]
89pub struct BaseClient {
90    /// The state store.
91    pub(crate) state_store: BaseStateStore,
92
93    /// The store used by the event cache.
94    event_cache_store: EventCacheStoreLock,
95
96    /// The store used by the media cache.
97    media_store: MediaStoreLock,
98
99    /// The store used for encryption.
100    ///
101    /// This field is only meant to be used for `OlmMachine` initialization.
102    /// All operations on it happen inside the `OlmMachine`.
103    #[cfg(feature = "e2e-encryption")]
104    crypto_store: Arc<DynCryptoStore>,
105
106    /// The olm-machine that is created once the
107    /// [`SessionMeta`][crate::session::SessionMeta] is set via
108    /// [`BaseClient::activate`]
109    #[cfg(feature = "e2e-encryption")]
110    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
111
112    /// Observable of when a user is ignored/unignored.
113    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
114
115    /// A sender that is used to communicate changes to room information. Each
116    /// tick contains the room ID and the reasons that have generated this tick.
117    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
118
119    /// The strategy to use for picking recipient devices, when sending an
120    /// encrypted message.
121    #[cfg(feature = "e2e-encryption")]
122    pub room_key_recipient_strategy: CollectStrategy,
123
124    /// The settings to use for decrypting events.
125    #[cfg(feature = "e2e-encryption")]
126    pub decryption_settings: DecryptionSettings,
127
128    /// If the client should handle verification events received when syncing.
129    #[cfg(feature = "e2e-encryption")]
130    pub handle_verification_events: bool,
131
132    /// Whether the client supports threads or not.
133    pub threading_support: ThreadingSupport,
134}
135
136#[cfg(not(tarpaulin_include))]
137impl fmt::Debug for BaseClient {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        f.debug_struct("BaseClient")
140            .field("session_meta", &self.state_store.session_meta())
141            .field("sync_token", &self.state_store.sync_token)
142            .finish_non_exhaustive()
143    }
144}
145
146/// Whether this client instance supports threading or not. Currently used to
147/// determine how the client handles read receipts and unread count computations
148/// on the base SDK level.
149///
150/// Timelines on the other hand have a separate `TimelineFocus`
151/// `hide_threaded_events` associated value that can be used to hide threaded
152/// events but also to enable threaded read receipt sending. This is because
153/// certain timeline instances should ignore threading no matter what's defined
154/// at the client level. One such example are media filtered timelines which
155/// should contain all the room's media no matter what thread its in (unless
156/// explicitly opted into).
157#[derive(Clone, Copy, Debug)]
158pub enum ThreadingSupport {
159    /// Threading enabled.
160    Enabled {
161        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
162        ///
163        /// This may cause filtering out of thread subscriptions, and loading
164        /// the thread subscriptions via the sliding sync extension,
165        /// when the room list service is being used.
166        with_subscriptions: bool,
167    },
168    /// Threading disabled.
169    Disabled,
170}
171
172impl BaseClient {
173    /// Create a new client.
174    ///
175    /// # Arguments
176    ///
177    /// * `config` - the configuration for the stores (state store, event cache
178    ///   store and crypto store).
179    pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
180        let store = BaseStateStore::new(config.state_store);
181
182        // Create the channel to receive `RoomInfoNotableUpdate`.
183        //
184        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
185        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
186        // trigger such amount of updates, it's a safe value.
187        //
188        // Also, note that it must not be
189        // zero, because (i) it will panic, (ii) a new user has no room, but can create
190        // rooms; remember that the channel's capacity is immutable.
191        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
192            broadcast::channel(500);
193
194        BaseClient {
195            state_store: store,
196            event_cache_store: config.event_cache_store,
197            media_store: config.media_store,
198            #[cfg(feature = "e2e-encryption")]
199            crypto_store: config.crypto_store,
200            #[cfg(feature = "e2e-encryption")]
201            olm_machine: Default::default(),
202            ignore_user_list_changes: Default::default(),
203            room_info_notable_update_sender,
204            #[cfg(feature = "e2e-encryption")]
205            room_key_recipient_strategy: Default::default(),
206            #[cfg(feature = "e2e-encryption")]
207            decryption_settings: DecryptionSettings {
208                sender_device_trust_requirement: TrustRequirement::Untrusted,
209            },
210            #[cfg(feature = "e2e-encryption")]
211            handle_verification_events: true,
212            threading_support,
213        }
214    }
215
216    /// Clones the current base client to use the same crypto store but a
217    /// different, in-memory store config, and resets transient state.
218    #[cfg(feature = "e2e-encryption")]
219    pub async fn clone_with_in_memory_state_store(
220        &self,
221        cross_process_store_locks_holder_name: &str,
222        handle_verification_events: bool,
223    ) -> Result<Self> {
224        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
225            .state_store(MemoryStore::new());
226        let config = config.crypto_store(self.crypto_store.clone());
227
228        let copy = Self {
229            state_store: BaseStateStore::new(config.state_store),
230            event_cache_store: config.event_cache_store,
231            media_store: config.media_store,
232            // We copy the crypto store as well as the `OlmMachine` for two reasons:
233            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
234            // 2. We need to ensure that the parent and child use the same data and caches inside
235            //    the `OlmMachine` so the various ratchets and places where new randomness gets
236            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
237            //    or Olm sessions when they encrypt or decrypt messages.
238            crypto_store: self.crypto_store.clone(),
239            olm_machine: self.olm_machine.clone(),
240            ignore_user_list_changes: Default::default(),
241            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
242            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
243            decryption_settings: self.decryption_settings.clone(),
244            handle_verification_events,
245            threading_support: self.threading_support,
246        };
247
248        copy.state_store
249            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
250            .await?;
251
252        Ok(copy)
253    }
254
255    /// Clones the current base client to use the same crypto store but a
256    /// different, in-memory store config, and resets transient state.
257    #[cfg(not(feature = "e2e-encryption"))]
258    #[allow(clippy::unused_async)]
259    pub async fn clone_with_in_memory_state_store(
260        &self,
261        cross_process_store_locks_holder: &str,
262        _handle_verification_events: bool,
263    ) -> Result<Self> {
264        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
265            .state_store(MemoryStore::new());
266        Ok(Self::new(config, ThreadingSupport::Disabled))
267    }
268
269    /// Get the session meta information.
270    ///
271    /// If the client is currently logged in, this will return a
272    /// [`SessionMeta`] object which contains the user ID and device ID.
273    /// Otherwise it returns `None`.
274    pub fn session_meta(&self) -> Option<&SessionMeta> {
275        self.state_store.session_meta()
276    }
277
278    /// Get all the rooms this client knows about.
279    pub fn rooms(&self) -> Vec<Room> {
280        self.state_store.rooms()
281    }
282
283    /// Get all the rooms this client knows about, filtered by room state.
284    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
285        self.state_store.rooms_filtered(filter)
286    }
287
288    /// Get a stream of all the rooms changes, in addition to the existing
289    /// rooms.
290    pub fn rooms_stream(
291        &self,
292    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
293        self.state_store.rooms_stream()
294    }
295
296    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
297    /// yet in the store
298    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
299        self.state_store.get_or_create_room(
300            room_id,
301            room_state,
302            self.room_info_notable_update_sender.clone(),
303        )
304    }
305
306    /// Get a reference to the state store.
307    pub fn state_store(&self) -> &DynStateStore {
308        self.state_store.deref()
309    }
310
311    /// Get a reference to the event cache store.
312    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
313        &self.event_cache_store
314    }
315
316    /// Get a reference to the media store.
317    pub fn media_store(&self) -> &MediaStoreLock {
318        &self.media_store
319    }
320
321    /// Check whether the client has been activated.
322    ///
323    /// See [`BaseClient::activate`] to know what it means.
324    pub fn is_active(&self) -> bool {
325        self.state_store.session_meta().is_some()
326    }
327
328    /// Activate the client.
329    ///
330    /// A client is considered active when:
331    ///
332    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
333    /// 2. Has loaded cached data from storage,
334    /// 3. If encryption is enabled, it also initialized or restored its
335    ///    `OlmMachine`.
336    ///
337    /// # Arguments
338    ///
339    /// * `session_meta` - The meta of a session that the user already has from
340    ///   a previous login call.
341    ///
342    /// * `custom_account` - A custom
343    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
344    ///   identity and one-time keys of this [`BaseClient`]. If no account is
345    ///   provided, a new default one or one from the store will be used. If an
346    ///   account is provided and one already exists in the store for this
347    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
348    ///   useful if one wishes to create identity keys before knowing the
349    ///   user/device IDs, e.g., to use the identity key as the device ID.
350    ///
351    /// * `room_load_settings` — Specify how many rooms must be restored; use
352    ///   `::default()` if you don't know which value to pick.
353    ///
354    /// # Panics
355    ///
356    /// This method panics if it is called twice.
357    ///
358    /// [`UserId`]: ruma::UserId
359    pub async fn activate(
360        &self,
361        session_meta: SessionMeta,
362        room_load_settings: RoomLoadSettings,
363        #[cfg(feature = "e2e-encryption")] custom_account: Option<
364            crate::crypto::vodozemac::olm::Account,
365        >,
366    ) -> Result<()> {
367        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
368
369        self.state_store
370            .load_rooms(
371                &session_meta.user_id,
372                room_load_settings,
373                &self.room_info_notable_update_sender,
374            )
375            .await?;
376        self.state_store.load_sync_token().await?;
377        self.state_store.set_session_meta(session_meta);
378
379        #[cfg(feature = "e2e-encryption")]
380        self.regenerate_olm(custom_account).await?;
381
382        Ok(())
383    }
384
385    /// Recreate an `OlmMachine` from scratch.
386    ///
387    /// In particular, this will clear all its caches.
388    #[cfg(feature = "e2e-encryption")]
389    pub async fn regenerate_olm(
390        &self,
391        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
392    ) -> Result<()> {
393        tracing::debug!("regenerating OlmMachine");
394        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
395
396        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
397        // because we suspect it has stale data.
398        let olm_machine = OlmMachine::with_store(
399            &session_meta.user_id,
400            &session_meta.device_id,
401            self.crypto_store.clone(),
402            custom_account,
403        )
404        .await
405        .map_err(OlmError::from)?;
406
407        *self.olm_machine.write().await = Some(olm_machine);
408        Ok(())
409    }
410
411    /// Get the current, if any, sync token of the client.
412    /// This will be None if the client didn't sync at least once.
413    pub async fn sync_token(&self) -> Option<String> {
414        self.state_store.sync_token.read().await.clone()
415    }
416
417    /// User has knocked on a room.
418    ///
419    /// Update the internal and cached state accordingly. Return the final Room.
420    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
421        let room = self.state_store.get_or_create_room(
422            room_id,
423            RoomState::Knocked,
424            self.room_info_notable_update_sender.clone(),
425        );
426
427        if room.state() != RoomState::Knocked {
428            let _state_store_lock = self.state_store_lock().lock().await;
429
430            let mut room_info = room.clone_info();
431            room_info.mark_as_knocked();
432            room_info.mark_state_partially_synced();
433            room_info.mark_members_missing(); // the own member event changed
434            let mut changes = StateChanges::default();
435            changes.add_room(room_info.clone());
436            self.state_store.save_changes(&changes).await?; // Update the store
437            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
438        }
439
440        Ok(room)
441    }
442
443    /// The user has joined a room using this specific client.
444    ///
445    /// This method should be called if the user accepts an invite or if they
446    /// join a public room.
447    ///
448    /// The method will create a [`Room`] object if one does not exist yet and
449    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
450    /// object will be persisted in the cache. Please note that the [`Room`]
451    /// will be a stub until a sync has been received with the full room
452    /// state using [`BaseClient::receive_sync_response`].
453    ///
454    /// Update the internal and cached state accordingly. Return the final Room.
455    ///
456    /// # Arguments
457    ///
458    /// * `room_id` - The unique ID identifying the joined room.
459    /// * `inviter` - When joining this room in response to an invitation, the
460    ///   inviter should be recorded before sending the join request to the
461    ///   server. Providing the inviter here ensures that the
462    ///   [`InviteAcceptanceDetails`] are stored for this room.
463    ///
464    /// # Examples
465    ///
466    /// ```rust
467    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport};
468    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
469    /// # async {
470    /// # let client = BaseClient::new(StoreConfig::new("example".to_owned()), ThreadingSupport::Disabled);
471    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
472    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
473    /// # let room_id: &RoomId = todo!();
474    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
475    /// let room_id = send_join_request().await?;
476    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
477    ///
478    /// assert_eq!(room.state(), RoomState::Joined);
479    /// # matrix_sdk_test::TestResult::Ok(()) };
480    /// ```
481    pub async fn room_joined(
482        &self,
483        room_id: &RoomId,
484        inviter: Option<OwnedUserId>,
485    ) -> Result<Room> {
486        let room = self.state_store.get_or_create_room(
487            room_id,
488            RoomState::Joined,
489            self.room_info_notable_update_sender.clone(),
490        );
491
492        // If the state isn't `RoomState::Joined` then this means that we knew about
493        // this room before. Let's modify the existing state now.
494        if room.state() != RoomState::Joined {
495            let _state_store_lock = self.state_store_lock().lock().await;
496
497            let mut room_info = room.clone_info();
498            let previous_state = room.state();
499
500            room_info.mark_as_joined();
501            room_info.mark_state_partially_synced();
502            room_info.mark_members_missing(); // the own member event changed
503
504            // If our previous state was an invite and we're now in the joined state, this
505            // means that the user has explicitly accepted an invite. Let's
506            // remember some details about the invite.
507            //
508            // This is somewhat of a workaround for our lack of cryptographic membership.
509            // Later on we will decide if historic room keys should be accepted
510            // based on this info. If a user has accepted an invite and we receive a room
511            // key bundle shortly after, we might accept it. If we don't do
512            // this, the homeserver could trick us into accepting any historic room key
513            // bundle.
514            if previous_state == RoomState::Invited
515                && let Some(inviter) = inviter
516            {
517                let details = InviteAcceptanceDetails {
518                    invite_accepted_at: MilliSecondsSinceUnixEpoch::now(),
519                    inviter,
520                };
521                room_info.set_invite_acceptance_details(details);
522            }
523
524            let mut changes = StateChanges::default();
525            changes.add_room(room_info.clone());
526
527            self.state_store.save_changes(&changes).await?; // Update the store
528
529            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
530        }
531
532        Ok(room)
533    }
534
535    /// User has left a room.
536    ///
537    /// Update the internal and cached state accordingly.
538    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
539        let room = self.state_store.get_or_create_room(
540            room_id,
541            RoomState::Left,
542            self.room_info_notable_update_sender.clone(),
543        );
544
545        if room.state() != RoomState::Left {
546            let _state_store_lock = self.state_store_lock().lock().await;
547
548            let mut room_info = room.clone_info();
549            room_info.mark_as_left();
550            room_info.mark_state_partially_synced();
551            room_info.mark_members_missing(); // the own member event changed
552            let mut changes = StateChanges::default();
553            changes.add_room(room_info.clone());
554            self.state_store.save_changes(&changes).await?; // Update the store
555            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
556        }
557
558        Ok(())
559    }
560
561    /// Get a lock to the state store, with an exclusive access.
562    ///
563    /// It doesn't give an access to the state store itself. It's rather a lock
564    /// to synchronise all accesses to the state store.
565    pub fn state_store_lock(&self) -> &Mutex<()> {
566        self.state_store.lock()
567    }
568
569    /// Receive a response from a sync call.
570    ///
571    /// # Arguments
572    ///
573    /// * `response` - The response that we received after a successful sync.
574    #[instrument(skip_all)]
575    pub async fn receive_sync_response(
576        &self,
577        response: api::sync::sync_events::v3::Response,
578    ) -> Result<SyncResponse> {
579        self.receive_sync_response_with_requested_required_states(
580            response,
581            &RequestedRequiredStates::default(),
582        )
583        .await
584    }
585
586    /// Receive a response from a sync call, with the requested required state
587    /// events.
588    ///
589    /// # Arguments
590    ///
591    /// * `response` - The response that we received after a successful sync.
592    /// * `requested_required_states` - The requested required state events.
593    pub async fn receive_sync_response_with_requested_required_states(
594        &self,
595        response: api::sync::sync_events::v3::Response,
596        requested_required_states: &RequestedRequiredStates,
597    ) -> Result<SyncResponse> {
598        // The server might respond multiple times with the same sync token, in
599        // that case we already received this response and there's nothing to
600        // do.
601        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
602            info!("Got the same sync response twice");
603            return Ok(SyncResponse::default());
604        }
605
606        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
607
608        #[cfg(feature = "e2e-encryption")]
609        let olm_machine = self.olm_machine().await;
610
611        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
612
613        #[cfg(feature = "e2e-encryption")]
614        let to_device = {
615            let processors::e2ee::to_device::Output {
616                processed_to_device_events: to_device,
617                room_key_updates,
618            } = processors::e2ee::to_device::from_sync_v2(
619                &response,
620                olm_machine.as_ref(),
621                &self.decryption_settings,
622            )
623            .await?;
624
625            processors::latest_event::decrypt_from_rooms(
626                &mut context,
627                room_key_updates
628                    .into_iter()
629                    .flatten()
630                    .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
631                    .collect(),
632                processors::e2ee::E2EE::new(
633                    olm_machine.as_ref(),
634                    &self.decryption_settings,
635                    self.handle_verification_events,
636                ),
637            )
638            .await?;
639
640            to_device
641        };
642
643        #[cfg(not(feature = "e2e-encryption"))]
644        let to_device = response
645            .to_device
646            .events
647            .into_iter()
648            .map(|raw| {
649                use matrix_sdk_common::deserialized_responses::{
650                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
651                    ToDeviceUnableToDecryptReason,
652                };
653
654                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
655                    if event_type == "m.room.encrypted" {
656                        ProcessedToDeviceEvent::UnableToDecrypt {
657                            encrypted_event: raw,
658                            utd_info: ToDeviceUnableToDecryptInfo {
659                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
660                            },
661                        }
662                    } else {
663                        ProcessedToDeviceEvent::PlainText(raw)
664                    }
665                } else {
666                    // Exclude events with no type
667                    ProcessedToDeviceEvent::Invalid(raw)
668                }
669            })
670            .collect();
671
672        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
673
674        let global_account_data_processor =
675            processors::account_data::global(&response.account_data.events);
676
677        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
678
679        let mut room_updates = RoomUpdates::default();
680        let mut notifications = Default::default();
681
682        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
683            BTreeMap::new();
684
685        for (room_id, joined_room) in response.rooms.join {
686            let joined_room_update = processors::room::sync_v2::update_joined_room(
687                &mut context,
688                processors::room::RoomCreationData::new(
689                    &room_id,
690                    self.room_info_notable_update_sender.clone(),
691                    requested_required_states,
692                    &mut ambiguity_cache,
693                ),
694                joined_room,
695                &mut updated_members_in_room,
696                processors::notification::Notification::new(
697                    &push_rules,
698                    &mut notifications,
699                    &self.state_store,
700                ),
701                #[cfg(feature = "e2e-encryption")]
702                processors::e2ee::E2EE::new(
703                    olm_machine.as_ref(),
704                    &self.decryption_settings,
705                    self.handle_verification_events,
706                ),
707            )
708            .await?;
709
710            room_updates.joined.insert(room_id, joined_room_update);
711        }
712
713        for (room_id, left_room) in response.rooms.leave {
714            let left_room_update = processors::room::sync_v2::update_left_room(
715                &mut context,
716                processors::room::RoomCreationData::new(
717                    &room_id,
718                    self.room_info_notable_update_sender.clone(),
719                    requested_required_states,
720                    &mut ambiguity_cache,
721                ),
722                left_room,
723                processors::notification::Notification::new(
724                    &push_rules,
725                    &mut notifications,
726                    &self.state_store,
727                ),
728                #[cfg(feature = "e2e-encryption")]
729                processors::e2ee::E2EE::new(
730                    olm_machine.as_ref(),
731                    &self.decryption_settings,
732                    self.handle_verification_events,
733                ),
734            )
735            .await?;
736
737            room_updates.left.insert(room_id, left_room_update);
738        }
739
740        for (room_id, invited_room) in response.rooms.invite {
741            let invited_room_update = processors::room::sync_v2::update_invited_room(
742                &mut context,
743                &room_id,
744                invited_room,
745                self.room_info_notable_update_sender.clone(),
746                processors::notification::Notification::new(
747                    &push_rules,
748                    &mut notifications,
749                    &self.state_store,
750                ),
751            )
752            .await?;
753
754            room_updates.invited.insert(room_id, invited_room_update);
755        }
756
757        for (room_id, knocked_room) in response.rooms.knock {
758            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
759                &mut context,
760                &room_id,
761                knocked_room,
762                self.room_info_notable_update_sender.clone(),
763                processors::notification::Notification::new(
764                    &push_rules,
765                    &mut notifications,
766                    &self.state_store,
767                ),
768            )
769            .await?;
770
771            room_updates.knocked.insert(room_id, knocked_room_update);
772        }
773
774        global_account_data_processor.apply(&mut context, &self.state_store).await;
775
776        context.state_changes.presence = response
777            .presence
778            .events
779            .iter()
780            .filter_map(|e| {
781                let event = e.deserialize().ok()?;
782                Some((event.sender, e.clone()))
783            })
784            .collect();
785
786        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
787
788        {
789            let _state_store_lock = self.state_store_lock().lock().await;
790
791            processors::changes::save_and_apply(
792                context,
793                &self.state_store,
794                &self.ignore_user_list_changes,
795                Some(response.next_batch.clone()),
796            )
797            .await?;
798        }
799
800        let mut context = Context::default();
801
802        // Now that all the rooms information have been saved, update the display name
803        // of the updated rooms (which relies on information stored in the database).
804        processors::room::display_name::update_for_rooms(
805            &mut context,
806            &room_updates,
807            &self.state_store,
808        )
809        .await;
810
811        // Save the new display name updates if any.
812        {
813            let _state_store_lock = self.state_store_lock().lock().await;
814
815            processors::changes::save_only(context, &self.state_store).await?;
816        }
817
818        for (room_id, member_ids) in updated_members_in_room {
819            if let Some(room) = self.get_room(&room_id) {
820                let _ =
821                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
822            }
823        }
824
825        if enabled!(Level::INFO) {
826            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
827        }
828
829        let response = SyncResponse {
830            rooms: room_updates,
831            presence: response.presence.events,
832            account_data: response.account_data.events,
833            to_device,
834            notifications,
835        };
836
837        Ok(response)
838    }
839
840    /// Receive a get member events response and convert it to a deserialized
841    /// `MembersResponse`
842    ///
843    /// This client-server request must be made without filters to make sure all
844    /// members are received. Otherwise, an error is returned.
845    ///
846    /// # Arguments
847    ///
848    /// * `room_id` - The room id this response belongs to.
849    ///
850    /// * `response` - The raw response that was received from the server.
851    #[instrument(skip_all, fields(?room_id))]
852    pub async fn receive_all_members(
853        &self,
854        room_id: &RoomId,
855        request: &api::membership::get_member_events::v3::Request,
856        response: &api::membership::get_member_events::v3::Response,
857    ) -> Result<()> {
858        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
859        {
860            // This function assumes all members are loaded at once to optimise how display
861            // name disambiguation works. Using it with partial member list results
862            // would produce incorrect disambiguated display name entries
863            return Err(Error::InvalidReceiveMembersParameters);
864        }
865
866        let Some(room) = self.state_store.room(room_id) else {
867            // The room is unknown to us: leave early.
868            return Ok(());
869        };
870
871        let mut chunk = Vec::with_capacity(response.chunk.len());
872        let mut context = Context::default();
873
874        #[cfg(feature = "e2e-encryption")]
875        let mut user_ids = BTreeSet::new();
876
877        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
878
879        for raw_event in &response.chunk {
880            let member = match raw_event.deserialize() {
881                Ok(ev) => ev,
882                Err(e) => {
883                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
884                    debug!(event_id, "Failed to deserialize member event: {e}");
885                    continue;
886                }
887            };
888
889            // TODO: All the actions in this loop used to be done only when the membership
890            // event was not in the store before. This was changed with the new room API,
891            // because e.g. leaving a room makes members events outdated and they need to be
892            // fetched by `members`. Therefore, they need to be overwritten here, even
893            // if they exist.
894            // However, this makes a new problem occur where setting the member events here
895            // potentially races with the sync.
896            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
897
898            #[cfg(feature = "e2e-encryption")]
899            match member.membership() {
900                MembershipState::Join | MembershipState::Invite => {
901                    user_ids.insert(member.state_key().to_owned());
902                }
903                _ => (),
904            }
905
906            if let StateEvent::Original(e) = &member
907                && let Some(d) = &e.content.displayname
908            {
909                let display_name = DisplayName::new(d);
910                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
911            }
912
913            let sync_member: SyncRoomMemberEvent = member.clone().into();
914            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
915
916            context
917                .state_changes
918                .state
919                .entry(room_id.to_owned())
920                .or_default()
921                .entry(member.event_type())
922                .or_default()
923                .insert(member.state_key().to_string(), raw_event.clone().cast());
924            chunk.push(member);
925        }
926
927        #[cfg(feature = "e2e-encryption")]
928        processors::e2ee::tracked_users::update(
929            self.olm_machine().await.as_ref(),
930            room.encryption_state(),
931            &user_ids,
932        )
933        .await?;
934
935        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
936
937        {
938            let _state_store_lock = self.state_store_lock().lock().await;
939
940            let mut room_info = room.clone_info();
941            room_info.mark_members_synced();
942            context.state_changes.add_room(room_info);
943
944            processors::changes::save_and_apply(
945                context,
946                &self.state_store,
947                &self.ignore_user_list_changes,
948                None,
949            )
950            .await?;
951        }
952
953        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
954
955        Ok(())
956    }
957
958    /// Receive a successful filter upload response, the filter id will be
959    /// stored under the given name in the store.
960    ///
961    /// The filter id can later be retrieved with the [`get_filter`] method.
962    ///
963    ///
964    /// # Arguments
965    ///
966    /// * `filter_name` - The name that should be used to persist the filter id
967    ///   in the store.
968    ///
969    /// * `response` - The successful filter upload response containing the
970    ///   filter id.
971    ///
972    /// [`get_filter`]: #method.get_filter
973    pub async fn receive_filter_upload(
974        &self,
975        filter_name: &str,
976        response: &api::filter::create_filter::v3::Response,
977    ) -> Result<()> {
978        Ok(self
979            .state_store
980            .set_kv_data(
981                StateStoreDataKey::Filter(filter_name),
982                StateStoreDataValue::Filter(response.filter_id.clone()),
983            )
984            .await?)
985    }
986
987    /// Get the filter id of a previously uploaded filter.
988    ///
989    /// *Note*: A filter will first need to be uploaded and persisted using
990    /// [`receive_filter_upload`].
991    ///
992    /// # Arguments
993    ///
994    /// * `filter_name` - The name of the filter that was previously used to
995    ///   persist the filter.
996    ///
997    /// [`receive_filter_upload`]: #method.receive_filter_upload
998    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
999        let filter = self
1000            .state_store
1001            .get_kv_data(StateStoreDataKey::Filter(filter_name))
1002            .await?
1003            .map(|d| d.into_filter().expect("State store data not a filter"));
1004
1005        Ok(filter)
1006    }
1007
1008    /// Get a to-device request that will share a room key with users in a room.
1009    #[cfg(feature = "e2e-encryption")]
1010    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1011        match self.olm_machine().await.as_ref() {
1012            Some(o) => {
1013                let Some(room) = self.get_room(room_id) else {
1014                    return Err(Error::InsufficientData);
1015                };
1016
1017                let history_visibility = room.history_visibility_or_default();
1018                let Some(room_encryption_event) = room.encryption_settings() else {
1019                    return Err(Error::EncryptionNotEnabled);
1020                };
1021
1022                // Don't share the group session with members that are invited
1023                // if the history visibility is set to `Joined`
1024                let filter = if history_visibility == HistoryVisibility::Joined {
1025                    RoomMemberships::JOIN
1026                } else {
1027                    RoomMemberships::ACTIVE
1028                };
1029
1030                let members = self.state_store.get_user_ids(room_id, filter).await?;
1031
1032                let settings = EncryptionSettings::new(
1033                    room_encryption_event,
1034                    history_visibility,
1035                    self.room_key_recipient_strategy.clone(),
1036                );
1037
1038                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1039            }
1040            None => panic!("Olm machine wasn't started"),
1041        }
1042    }
1043
1044    /// Get the room with the given room id.
1045    ///
1046    /// # Arguments
1047    ///
1048    /// * `room_id` - The id of the room that should be fetched.
1049    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1050        self.state_store.room(room_id)
1051    }
1052
1053    /// Forget the room with the given room ID.
1054    ///
1055    /// The room will be dropped from the room list and the store.
1056    ///
1057    /// # Arguments
1058    ///
1059    /// * `room_id` - The id of the room that should be forgotten.
1060    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1061        // Forget the room in the state store.
1062        self.state_store.forget_room(room_id).await?;
1063
1064        // Remove the room in the event cache store too.
1065        match self.event_cache_store().lock().await? {
1066            // If the lock is clear, we can do the operation as expected.
1067            // If the lock is dirty, we can ignore to refresh the state, we just need to remove a
1068            // room. Also, we must not mark the lock as non-dirty because other operations may be
1069            // critical and may need to refresh the `EventCache`' state.
1070            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
1071                guard.remove_room(room_id).await?
1072            }
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Get the olm machine.
1079    #[cfg(feature = "e2e-encryption")]
1080    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1081        self.olm_machine.read().await
1082    }
1083
1084    /// Get the push rules.
1085    ///
1086    /// Gets the push rules previously processed, otherwise get them from the
1087    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1088    /// is logged in.
1089    pub(crate) async fn get_push_rules(
1090        &self,
1091        global_account_data_processor: &processors::account_data::Global,
1092    ) -> Result<Ruleset> {
1093        let _timer = timer!(Level::TRACE, "get_push_rules");
1094        if let Some(event) = global_account_data_processor
1095            .push_rules()
1096            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1097        {
1098            Ok(event.content.global)
1099        } else if let Some(event) = self
1100            .state_store
1101            .get_account_data_event_static::<PushRulesEventContent>()
1102            .await?
1103            .and_then(|ev| ev.deserialize().ok())
1104        {
1105            Ok(event.content.global)
1106        } else if let Some(session_meta) = self.state_store.session_meta() {
1107            Ok(Ruleset::server_default(&session_meta.user_id))
1108        } else {
1109            Ok(Ruleset::new())
1110        }
1111    }
1112
1113    /// Returns a subscriber that publishes an event every time the ignore user
1114    /// list changes
1115    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1116        self.ignore_user_list_changes.subscribe()
1117    }
1118
1119    /// Returns a new receiver that gets future room info notable updates.
1120    ///
1121    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1122    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1123        self.room_info_notable_update_sender.subscribe()
1124    }
1125
1126    /// Checks whether the provided `user_id` belongs to an ignored user.
1127    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1128        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1129        {
1130            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1131                Ok(current_ignored_user_list) => {
1132                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1133                }
1134                Err(error) => {
1135                    warn!(?error, "Failed to deserialize the ignored user list event");
1136                    false
1137                }
1138            },
1139            Ok(None) => false,
1140            Err(error) => {
1141                warn!(?error, "Could not get the ignored user list from the state store");
1142                false
1143            }
1144        }
1145    }
1146}
1147
1148/// Represent the `required_state` values sent by a sync request.
1149///
1150/// This is useful to track what state events have been requested when handling
1151/// a response.
1152///
1153/// For example, if a sync requests the `m.room.encryption` state event, and the
1154/// server replies with nothing, if means the room **is not** encrypted. Without
1155/// knowing which state event was required by the sync, it is impossible to
1156/// interpret the absence of state event from the server as _the room's
1157/// encryption state is **not encrypted**_ or _the room's encryption state is
1158/// **unknown**_.
1159#[derive(Debug, Default)]
1160pub struct RequestedRequiredStates {
1161    default: Vec<(StateEventType, String)>,
1162    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1163}
1164
1165impl RequestedRequiredStates {
1166    /// Create a new `RequestedRequiredStates`.
1167    ///
1168    /// `default` represents the `required_state` value for all rooms.
1169    /// `for_rooms` is the `required_state` per room.
1170    pub fn new(
1171        default: Vec<(StateEventType, String)>,
1172        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1173    ) -> Self {
1174        Self { default, for_rooms }
1175    }
1176
1177    /// Get the `required_state` value for a specific room.
1178    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1179        self.for_rooms.get(room_id).unwrap_or(&self.default)
1180    }
1181}
1182
1183impl From<&v5::Request> for RequestedRequiredStates {
1184    fn from(request: &v5::Request) -> Self {
1185        // The following information is missing in the MSC4186 at the time of writing
1186        // (2025-03-12) but: the `required_state`s from all lists and from all room
1187        // subscriptions are combined by doing an union.
1188        //
1189        // Thus, we can do the same here, put the union in `default` and keep
1190        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1191        let mut default = BTreeSet::new();
1192
1193        for list in request.lists.values() {
1194            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1195        }
1196
1197        for room_subscription in request.room_subscriptions.values() {
1198            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1199        }
1200
1201        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use std::collections::HashMap;
1208
1209    use assert_matches2::{assert_let, assert_matches};
1210    use futures_util::FutureExt as _;
1211    use matrix_sdk_test::{
1212        BOB, InvitedRoomBuilder, LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent,
1213        SyncResponseBuilder, async_test, event_factory::EventFactory, ruma_response_from_json,
1214    };
1215    use ruma::{
1216        api::client::{self as api, sync::sync_events::v5},
1217        event_id,
1218        events::{StateEventType, room::member::MembershipState},
1219        room_id,
1220        serde::Raw,
1221        user_id,
1222    };
1223    use serde_json::{json, value::to_raw_value};
1224
1225    use super::{BaseClient, RequestedRequiredStates};
1226    use crate::{
1227        RoomDisplayName, RoomState, SessionMeta,
1228        client::ThreadingSupport,
1229        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1230        test_utils::logged_in_base_client,
1231    };
1232
1233    #[test]
1234    fn test_requested_required_states() {
1235        let room_id_0 = room_id!("!r0");
1236        let room_id_1 = room_id!("!r1");
1237
1238        let requested_required_states = RequestedRequiredStates::new(
1239            vec![(StateEventType::RoomAvatar, "".to_owned())],
1240            HashMap::from([(
1241                room_id_0.to_owned(),
1242                vec![
1243                    (StateEventType::RoomMember, "foo".to_owned()),
1244                    (StateEventType::RoomEncryption, "".to_owned()),
1245                ],
1246            )]),
1247        );
1248
1249        // A special set of state events exists for `room_id_0`.
1250        assert_eq!(
1251            requested_required_states.for_room(room_id_0),
1252            &[
1253                (StateEventType::RoomMember, "foo".to_owned()),
1254                (StateEventType::RoomEncryption, "".to_owned()),
1255            ]
1256        );
1257
1258        // No special list for `room_id_1`, it should return the defaults.
1259        assert_eq!(
1260            requested_required_states.for_room(room_id_1),
1261            &[(StateEventType::RoomAvatar, "".to_owned()),]
1262        );
1263    }
1264
1265    #[test]
1266    fn test_requested_required_states_from_sync_v5_request() {
1267        let room_id_0 = room_id!("!r0");
1268        let room_id_1 = room_id!("!r1");
1269
1270        // Empty request.
1271        let mut request = v5::Request::new();
1272
1273        {
1274            let requested_required_states = RequestedRequiredStates::from(&request);
1275
1276            assert!(requested_required_states.default.is_empty());
1277            assert!(requested_required_states.for_rooms.is_empty());
1278        }
1279
1280        // One list.
1281        request.lists.insert("foo".to_owned(), {
1282            let mut list = v5::request::List::default();
1283            list.room_details.required_state = vec![
1284                (StateEventType::RoomAvatar, "".to_owned()),
1285                (StateEventType::RoomEncryption, "".to_owned()),
1286            ];
1287
1288            list
1289        });
1290
1291        {
1292            let requested_required_states = RequestedRequiredStates::from(&request);
1293
1294            assert_eq!(
1295                requested_required_states.default,
1296                &[
1297                    (StateEventType::RoomAvatar, "".to_owned()),
1298                    (StateEventType::RoomEncryption, "".to_owned())
1299                ]
1300            );
1301            assert!(requested_required_states.for_rooms.is_empty());
1302        }
1303
1304        // Two lists.
1305        request.lists.insert("bar".to_owned(), {
1306            let mut list = v5::request::List::default();
1307            list.room_details.required_state = vec![
1308                (StateEventType::RoomEncryption, "".to_owned()),
1309                (StateEventType::RoomName, "".to_owned()),
1310            ];
1311
1312            list
1313        });
1314
1315        {
1316            let requested_required_states = RequestedRequiredStates::from(&request);
1317
1318            // Union of the state events.
1319            assert_eq!(
1320                requested_required_states.default,
1321                &[
1322                    (StateEventType::RoomAvatar, "".to_owned()),
1323                    (StateEventType::RoomEncryption, "".to_owned()),
1324                    (StateEventType::RoomName, "".to_owned()),
1325                ]
1326            );
1327            assert!(requested_required_states.for_rooms.is_empty());
1328        }
1329
1330        // One room subscription.
1331        request.room_subscriptions.insert(room_id_0.to_owned(), {
1332            let mut room_subscription = v5::request::RoomSubscription::default();
1333
1334            room_subscription.required_state = vec![
1335                (StateEventType::RoomJoinRules, "".to_owned()),
1336                (StateEventType::RoomEncryption, "".to_owned()),
1337            ];
1338
1339            room_subscription
1340        });
1341
1342        {
1343            let requested_required_states = RequestedRequiredStates::from(&request);
1344
1345            // Union of state events, all in `default`, still nothing in `for_rooms`.
1346            assert_eq!(
1347                requested_required_states.default,
1348                &[
1349                    (StateEventType::RoomAvatar, "".to_owned()),
1350                    (StateEventType::RoomEncryption, "".to_owned()),
1351                    (StateEventType::RoomJoinRules, "".to_owned()),
1352                    (StateEventType::RoomName, "".to_owned()),
1353                ]
1354            );
1355            assert!(requested_required_states.for_rooms.is_empty());
1356        }
1357
1358        // Two room subscriptions.
1359        request.room_subscriptions.insert(room_id_1.to_owned(), {
1360            let mut room_subscription = v5::request::RoomSubscription::default();
1361
1362            room_subscription.required_state = vec![
1363                (StateEventType::RoomName, "".to_owned()),
1364                (StateEventType::RoomTopic, "".to_owned()),
1365            ];
1366
1367            room_subscription
1368        });
1369
1370        {
1371            let requested_required_states = RequestedRequiredStates::from(&request);
1372
1373            // Union of state events, all in `default`, still nothing in `for_rooms`.
1374            assert_eq!(
1375                requested_required_states.default,
1376                &[
1377                    (StateEventType::RoomAvatar, "".to_owned()),
1378                    (StateEventType::RoomEncryption, "".to_owned()),
1379                    (StateEventType::RoomJoinRules, "".to_owned()),
1380                    (StateEventType::RoomName, "".to_owned()),
1381                    (StateEventType::RoomTopic, "".to_owned()),
1382                ]
1383            );
1384        }
1385    }
1386
1387    #[async_test]
1388    async fn test_invite_after_leaving() {
1389        let user_id = user_id!("@alice:example.org");
1390        let room_id = room_id!("!test:example.org");
1391
1392        let client = logged_in_base_client(Some(user_id)).await;
1393
1394        let mut sync_builder = SyncResponseBuilder::new();
1395
1396        let response = sync_builder
1397            .add_left_room(
1398                LeftRoomBuilder::new(room_id).add_timeline_event(
1399                    EventFactory::new()
1400                        .member(user_id)
1401                        .membership(MembershipState::Leave)
1402                        .display_name("Alice")
1403                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1404                ),
1405            )
1406            .build_sync_response();
1407        client.receive_sync_response(response).await.unwrap();
1408        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1409
1410        let response = sync_builder
1411            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1412                StrippedStateTestEvent::Custom(json!({
1413                    "content": {
1414                        "displayname": "Alice",
1415                        "membership": "invite",
1416                    },
1417                    "event_id": "$143273582443PhrSn:example.org",
1418                    "origin_server_ts": 1432735824653u64,
1419                    "sender": "@example:example.org",
1420                    "state_key": user_id,
1421                    "type": "m.room.member",
1422                })),
1423            ))
1424            .build_sync_response();
1425        client.receive_sync_response(response).await.unwrap();
1426        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1427    }
1428
1429    #[async_test]
1430    async fn test_invite_displayname() {
1431        let user_id = user_id!("@alice:example.org");
1432        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1433
1434        let client = logged_in_base_client(Some(user_id)).await;
1435
1436        let response = ruma_response_from_json(&json!({
1437            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1438            "device_one_time_keys_count": {
1439                "signed_curve25519": 50u64
1440            },
1441            "device_unused_fallback_key_types": [
1442                "signed_curve25519"
1443            ],
1444            "rooms": {
1445                "invite": {
1446                    "!ithpyNKDtmhneaTQja:example.org": {
1447                        "invite_state": {
1448                            "events": [
1449                                {
1450                                    "content": {
1451                                        "creator": "@test:example.org",
1452                                        "room_version": "9"
1453                                    },
1454                                    "sender": "@test:example.org",
1455                                    "state_key": "",
1456                                    "type": "m.room.create"
1457                                },
1458                                {
1459                                    "content": {
1460                                        "join_rule": "invite"
1461                                    },
1462                                    "sender": "@test:example.org",
1463                                    "state_key": "",
1464                                    "type": "m.room.join_rules"
1465                                },
1466                                {
1467                                    "content": {
1468                                        "algorithm": "m.megolm.v1.aes-sha2"
1469                                    },
1470                                    "sender": "@test:example.org",
1471                                    "state_key": "",
1472                                    "type": "m.room.encryption"
1473                                },
1474                                {
1475                                    "content": {
1476                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1477                                        "displayname": "Kyra",
1478                                        "membership": "join"
1479                                    },
1480                                    "sender": "@test:example.org",
1481                                    "state_key": "@test:example.org",
1482                                    "type": "m.room.member"
1483                                },
1484                                {
1485                                    "content": {
1486                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1487                                        "displayname": "alice",
1488                                        "is_direct": true,
1489                                        "membership": "invite"
1490                                    },
1491                                    "origin_server_ts": 1650878657984u64,
1492                                    "sender": "@test:example.org",
1493                                    "state_key": "@alice:example.org",
1494                                    "type": "m.room.member",
1495                                    "unsigned": {
1496                                        "age": 14u64
1497                                    },
1498                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1499                                }
1500                            ]
1501                        }
1502                    }
1503                }
1504            }
1505        }));
1506
1507        client.receive_sync_response(response).await.unwrap();
1508
1509        let room = client.get_room(room_id).expect("Room not found");
1510        assert_eq!(room.state(), RoomState::Invited);
1511        assert_eq!(
1512            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1513            RoomDisplayName::Calculated("Kyra".to_owned())
1514        );
1515    }
1516
1517    #[async_test]
1518    async fn test_deserialization_failure() {
1519        let user_id = user_id!("@alice:example.org");
1520        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1521
1522        let client = BaseClient::new(
1523            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1524            ThreadingSupport::Disabled,
1525        );
1526        client
1527            .activate(
1528                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1529                RoomLoadSettings::default(),
1530                #[cfg(feature = "e2e-encryption")]
1531                None,
1532            )
1533            .await
1534            .unwrap();
1535
1536        let response = ruma_response_from_json(&json!({
1537            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1538            "rooms": {
1539                "join": {
1540                    "!ithpyNKDtmhneaTQja:example.org": {
1541                        "state": {
1542                            "events": [
1543                                {
1544                                    "invalid": "invalid",
1545                                },
1546                                {
1547                                    "content": {
1548                                        "name": "The room name"
1549                                    },
1550                                    "event_id": "$143273582443PhrSn:example.org",
1551                                    "origin_server_ts": 1432735824653u64,
1552                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1553                                    "sender": "@example:example.org",
1554                                    "state_key": "",
1555                                    "type": "m.room.name",
1556                                    "unsigned": {
1557                                        "age": 1234
1558                                    }
1559                                },
1560                            ]
1561                        }
1562                    }
1563                }
1564            }
1565        }));
1566
1567        client.receive_sync_response(response).await.unwrap();
1568        client
1569            .state_store()
1570            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1571            .await
1572            .expect("Failed to fetch state event")
1573            .expect("State event not found")
1574            .deserialize()
1575            .expect("Failed to deserialize state event");
1576    }
1577
1578    #[async_test]
1579    async fn test_invited_members_arent_ignored() {
1580        let user_id = user_id!("@alice:example.org");
1581        let inviter_user_id = user_id!("@bob:example.org");
1582        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1583
1584        let client = BaseClient::new(
1585            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1586            ThreadingSupport::Disabled,
1587        );
1588        client
1589            .activate(
1590                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1591                RoomLoadSettings::default(),
1592                #[cfg(feature = "e2e-encryption")]
1593                None,
1594            )
1595            .await
1596            .unwrap();
1597
1598        // Preamble: let the SDK know about the room.
1599        let mut sync_builder = SyncResponseBuilder::new();
1600        let response = sync_builder
1601            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1602            .build_sync_response();
1603        client.receive_sync_response(response).await.unwrap();
1604
1605        // When I process the result of a /members request that only contains an invited
1606        // member,
1607        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1608
1609        let raw_member_event = json!({
1610            "content": {
1611                "avatar_url": "mxc://localhost/fewjilfewjil42",
1612                "displayname": "Invited Alice",
1613                "membership": "invite"
1614            },
1615            "event_id": "$151800140517rfvjc:localhost",
1616            "origin_server_ts": 151800140,
1617            "room_id": room_id,
1618            "sender": inviter_user_id,
1619            "state_key": user_id,
1620            "type": "m.room.member",
1621            "unsigned": {
1622                "age": 13374242,
1623            }
1624        });
1625        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1626            to_raw_value(&raw_member_event).unwrap(),
1627        )]);
1628
1629        // It's correctly processed,
1630        client.receive_all_members(room_id, &request, &response).await.unwrap();
1631
1632        let room = client.get_room(room_id).unwrap();
1633
1634        // And I can get the invited member display name and avatar.
1635        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1636
1637        assert_eq!(member.user_id(), user_id);
1638        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1639        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1640    }
1641
1642    #[async_test]
1643    async fn test_reinvited_members_get_a_display_name() {
1644        let user_id = user_id!("@alice:example.org");
1645        let inviter_user_id = user_id!("@bob:example.org");
1646        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1647
1648        let client = BaseClient::new(
1649            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1650            ThreadingSupport::Disabled,
1651        );
1652        client
1653            .activate(
1654                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1655                RoomLoadSettings::default(),
1656                #[cfg(feature = "e2e-encryption")]
1657                None,
1658            )
1659            .await
1660            .unwrap();
1661
1662        // Preamble: let the SDK know about the room, and that the invited user left it.
1663        let mut sync_builder = SyncResponseBuilder::new();
1664        let response = sync_builder
1665            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1666                StateTestEvent::Custom(json!({
1667                    "content": {
1668                        "avatar_url": null,
1669                        "displayname": null,
1670                        "membership": "leave"
1671                    },
1672                    "event_id": "$151803140217rkvjc:localhost",
1673                    "origin_server_ts": 151800139,
1674                    "room_id": room_id,
1675                    "sender": user_id,
1676                    "state_key": user_id,
1677                    "type": "m.room.member",
1678                })),
1679            ))
1680            .build_sync_response();
1681        client.receive_sync_response(response).await.unwrap();
1682
1683        // Now, say that the user has been re-invited.
1684        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1685
1686        let raw_member_event = json!({
1687            "content": {
1688                "avatar_url": "mxc://localhost/fewjilfewjil42",
1689                "displayname": "Invited Alice",
1690                "membership": "invite"
1691            },
1692            "event_id": "$151800140517rfvjc:localhost",
1693            "origin_server_ts": 151800140,
1694            "room_id": room_id,
1695            "sender": inviter_user_id,
1696            "state_key": user_id,
1697            "type": "m.room.member",
1698            "unsigned": {
1699                "age": 13374242,
1700            }
1701        });
1702        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1703            to_raw_value(&raw_member_event).unwrap(),
1704        )]);
1705
1706        // It's correctly processed,
1707        client.receive_all_members(room_id, &request, &response).await.unwrap();
1708
1709        let room = client.get_room(room_id).unwrap();
1710
1711        // And I can get the invited member display name and avatar.
1712        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1713
1714        assert_eq!(member.user_id(), user_id);
1715        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1716        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1717    }
1718
1719    #[async_test]
1720    async fn test_ignored_user_list_changes() {
1721        let user_id = user_id!("@alice:example.org");
1722        let client = BaseClient::new(
1723            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1724            ThreadingSupport::Disabled,
1725        );
1726
1727        client
1728            .activate(
1729                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1730                RoomLoadSettings::default(),
1731                #[cfg(feature = "e2e-encryption")]
1732                None,
1733            )
1734            .await
1735            .unwrap();
1736
1737        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1738        assert!(subscriber.next().now_or_never().is_none());
1739
1740        let f = EventFactory::new();
1741        let mut sync_builder = SyncResponseBuilder::new();
1742        let response = sync_builder
1743            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1744            .build_sync_response();
1745        client.receive_sync_response(response).await.unwrap();
1746
1747        assert_let!(Some(ignored) = subscriber.next().await);
1748        assert_eq!(ignored, [BOB.to_string()]);
1749
1750        // Receive the same response.
1751        let response = sync_builder
1752            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1753            .build_sync_response();
1754        client.receive_sync_response(response).await.unwrap();
1755
1756        // No changes in the ignored list.
1757        assert!(subscriber.next().now_or_never().is_none());
1758
1759        // Now remove Bob from the ignored list.
1760        let response =
1761            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1762        client.receive_sync_response(response).await.unwrap();
1763
1764        assert_let!(Some(ignored) = subscriber.next().await);
1765        assert!(ignored.is_empty());
1766    }
1767
1768    #[async_test]
1769    async fn test_is_user_ignored() {
1770        let ignored_user_id = user_id!("@alice:example.org");
1771        let client = logged_in_base_client(None).await;
1772
1773        let mut sync_builder = SyncResponseBuilder::new();
1774        let f = EventFactory::new();
1775        let response = sync_builder
1776            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1777            .build_sync_response();
1778        client.receive_sync_response(response).await.unwrap();
1779
1780        assert!(client.is_user_ignored(ignored_user_id).await);
1781    }
1782
1783    #[async_test]
1784    async fn test_invite_details_are_set() {
1785        let user_id = user_id!("@alice:localhost");
1786        let client = logged_in_base_client(Some(user_id)).await;
1787        let invited_room_id = room_id!("!invited:localhost");
1788        let unknown_room_id = room_id!("!unknown:localhost");
1789
1790        let mut sync_builder = SyncResponseBuilder::new();
1791        let response = sync_builder
1792            .add_invited_room(InvitedRoomBuilder::new(invited_room_id))
1793            .build_sync_response();
1794        client.receive_sync_response(response).await.unwrap();
1795
1796        // Let us first check the initial state, we should have a room in the invite
1797        // state.
1798        let invited_room = client
1799            .get_room(invited_room_id)
1800            .expect("The sync should have created a room in the invited state");
1801
1802        assert_eq!(invited_room.state(), RoomState::Invited);
1803        assert!(invited_room.invite_acceptance_details().is_none());
1804
1805        // Now we join the room.
1806        let joined_room = client
1807            .room_joined(invited_room_id, Some(user_id.to_owned()))
1808            .await
1809            .expect("We should be able to mark a room as joined");
1810
1811        // Yup, we now have some invite details.
1812        assert_eq!(joined_room.state(), RoomState::Joined);
1813        assert_matches!(joined_room.invite_acceptance_details(), Some(details));
1814        assert_eq!(details.inviter, user_id);
1815
1816        // If we didn't know about the room before the join, we assume that there wasn't
1817        // an invite and we don't record the timestamp.
1818        assert!(client.get_room(unknown_room_id).is_none());
1819        let unknown_room = client
1820            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1821            .await
1822            .expect("We should be able to mark a room as joined");
1823
1824        assert_eq!(unknown_room.state(), RoomState::Joined);
1825        assert!(unknown_room.invite_acceptance_details().is_none());
1826
1827        sync_builder.clear();
1828        let response =
1829            sync_builder.add_left_room(LeftRoomBuilder::new(invited_room_id)).build_sync_response();
1830        client.receive_sync_response(response).await.unwrap();
1831
1832        // Now that we left the room, we shouldn't have any details anymore.
1833        let left_room = client
1834            .get_room(invited_room_id)
1835            .expect("The sync should have created a room in the invited state");
1836
1837        assert_eq!(left_room.state(), RoomState::Left);
1838        assert!(left_room.invite_acceptance_details().is_none());
1839    }
1840}