matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27use matrix_sdk_common::timer;
28#[cfg(feature = "e2e-encryption")]
29use matrix_sdk_crypto::{
30    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
31    TrustRequirement, store::DynCryptoStore, types::requests::ToDeviceRequest,
32};
33#[cfg(doc)]
34use ruma::DeviceId;
35#[cfg(feature = "e2e-encryption")]
36use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
37use ruma::{
38    MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UserId,
39    api::client::{self as api, sync::sync_events::v5},
40    events::{
41        StateEvent, StateEventType,
42        ignored_user_list::IgnoredUserListEventContent,
43        push_rules::{PushRulesEvent, PushRulesEventContent},
44        room::member::SyncRoomMemberEvent,
45    },
46    push::Ruleset,
47    time::Instant,
48};
49use tokio::sync::{Mutex, broadcast};
50#[cfg(feature = "e2e-encryption")]
51use tokio::sync::{RwLock, RwLockReadGuard};
52use tracing::{Level, debug, enabled, info, instrument, warn};
53
54#[cfg(feature = "e2e-encryption")]
55use crate::RoomMemberships;
56use crate::{
57    InviteAcceptanceDetails, RoomStateFilter, SessionMeta,
58    deserialized_responses::DisplayName,
59    error::{Error, Result},
60    event_cache::store::EventCacheStoreLock,
61    response_processors::{self as processors, Context},
62    room::{
63        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
64    },
65    store::{
66        BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult, RoomLoadSettings,
67        StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig,
68        ambiguity_map::AmbiguityCache,
69    },
70    sync::{RoomUpdates, SyncResponse},
71};
72
73/// A no (network) IO client implementation.
74///
75/// This client is a state machine that receives responses and events and
76/// accordingly updates its state. It is not designed to be used directly, but
77/// rather through `matrix_sdk::Client`.
78///
79/// ```rust
80/// use matrix_sdk_base::{BaseClient, ThreadingSupport, store::StoreConfig};
81///
82/// let client = BaseClient::new(
83///     StoreConfig::new("cross-process-holder-name".to_owned()),
84///     ThreadingSupport::Disabled,
85/// );
86/// ```
87#[derive(Clone)]
88pub struct BaseClient {
89    /// The state store.
90    pub(crate) state_store: BaseStateStore,
91
92    /// The store used by the event cache.
93    event_cache_store: EventCacheStoreLock,
94
95    /// The store used for encryption.
96    ///
97    /// This field is only meant to be used for `OlmMachine` initialization.
98    /// All operations on it happen inside the `OlmMachine`.
99    #[cfg(feature = "e2e-encryption")]
100    crypto_store: Arc<DynCryptoStore>,
101
102    /// The olm-machine that is created once the
103    /// [`SessionMeta`][crate::session::SessionMeta] is set via
104    /// [`BaseClient::activate`]
105    #[cfg(feature = "e2e-encryption")]
106    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
107
108    /// Observable of when a user is ignored/unignored.
109    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
110
111    /// A sender that is used to communicate changes to room information. Each
112    /// tick contains the room ID and the reasons that have generated this tick.
113    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
114
115    /// The strategy to use for picking recipient devices, when sending an
116    /// encrypted message.
117    #[cfg(feature = "e2e-encryption")]
118    pub room_key_recipient_strategy: CollectStrategy,
119
120    /// The settings to use for decrypting events.
121    #[cfg(feature = "e2e-encryption")]
122    pub decryption_settings: DecryptionSettings,
123
124    /// If the client should handle verification events received when syncing.
125    #[cfg(feature = "e2e-encryption")]
126    pub handle_verification_events: bool,
127
128    /// Whether the client supports threads or not.
129    pub threading_support: ThreadingSupport,
130}
131
132#[cfg(not(tarpaulin_include))]
133impl fmt::Debug for BaseClient {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        f.debug_struct("BaseClient")
136            .field("session_meta", &self.state_store.session_meta())
137            .field("sync_token", &self.state_store.sync_token)
138            .finish_non_exhaustive()
139    }
140}
141
142/// Whether this client instance supports threading or not. Currently used to
143/// determine how the client handles read receipts and unread count computations
144/// on the base SDK level.
145///
146/// Timelines on the other hand have a separate `TimelineFocus`
147/// `hide_threaded_events` associated value that can be used to hide threaded
148/// events but also to enable threaded read receipt sending. This is because
149/// certain timeline instances should ignore threading no matter what's defined
150/// at the client level. One such example are media filtered timelines which
151/// should contain all the room's media no matter what thread its in (unless
152/// explicitly opted into).
153#[derive(Clone, Copy, Debug)]
154pub enum ThreadingSupport {
155    /// Threading enabled.
156    Enabled {
157        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
158        ///
159        /// This may cause filtering out of thread subscriptions, and loading
160        /// the thread subscriptions via the sliding sync extension,
161        /// when the room list service is being used.
162        with_subscriptions: bool,
163    },
164    /// Threading disabled.
165    Disabled,
166}
167
168impl BaseClient {
169    /// Create a new client.
170    ///
171    /// # Arguments
172    ///
173    /// * `config` - the configuration for the stores (state store, event cache
174    ///   store and crypto store).
175    pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
176        let store = BaseStateStore::new(config.state_store);
177
178        // Create the channel to receive `RoomInfoNotableUpdate`.
179        //
180        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
181        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
182        // trigger such amount of updates, it's a safe value.
183        //
184        // Also, note that it must not be
185        // zero, because (i) it will panic, (ii) a new user has no room, but can create
186        // rooms; remember that the channel's capacity is immutable.
187        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
188            broadcast::channel(500);
189
190        BaseClient {
191            state_store: store,
192            event_cache_store: config.event_cache_store,
193            #[cfg(feature = "e2e-encryption")]
194            crypto_store: config.crypto_store,
195            #[cfg(feature = "e2e-encryption")]
196            olm_machine: Default::default(),
197            ignore_user_list_changes: Default::default(),
198            room_info_notable_update_sender,
199            #[cfg(feature = "e2e-encryption")]
200            room_key_recipient_strategy: Default::default(),
201            #[cfg(feature = "e2e-encryption")]
202            decryption_settings: DecryptionSettings {
203                sender_device_trust_requirement: TrustRequirement::Untrusted,
204            },
205            #[cfg(feature = "e2e-encryption")]
206            handle_verification_events: true,
207            threading_support,
208        }
209    }
210
211    /// Clones the current base client to use the same crypto store but a
212    /// different, in-memory store config, and resets transient state.
213    #[cfg(feature = "e2e-encryption")]
214    pub async fn clone_with_in_memory_state_store(
215        &self,
216        cross_process_store_locks_holder_name: &str,
217        handle_verification_events: bool,
218    ) -> Result<Self> {
219        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
220            .state_store(MemoryStore::new());
221        let config = config.crypto_store(self.crypto_store.clone());
222
223        let copy = Self {
224            state_store: BaseStateStore::new(config.state_store),
225            event_cache_store: config.event_cache_store,
226            // We copy the crypto store as well as the `OlmMachine` for two reasons:
227            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
228            // 2. We need to ensure that the parent and child use the same data and caches inside
229            //    the `OlmMachine` so the various ratchets and places where new randomness gets
230            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
231            //    or Olm sessions when they encrypt or decrypt messages.
232            crypto_store: self.crypto_store.clone(),
233            olm_machine: self.olm_machine.clone(),
234            ignore_user_list_changes: Default::default(),
235            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
236            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
237            decryption_settings: self.decryption_settings.clone(),
238            handle_verification_events,
239            threading_support: self.threading_support,
240        };
241
242        copy.state_store
243            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
244            .await?;
245
246        Ok(copy)
247    }
248
249    /// Clones the current base client to use the same crypto store but a
250    /// different, in-memory store config, and resets transient state.
251    #[cfg(not(feature = "e2e-encryption"))]
252    #[allow(clippy::unused_async)]
253    pub async fn clone_with_in_memory_state_store(
254        &self,
255        cross_process_store_locks_holder: &str,
256        _handle_verification_events: bool,
257    ) -> Result<Self> {
258        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
259            .state_store(MemoryStore::new());
260        Ok(Self::new(config, ThreadingSupport::Disabled))
261    }
262
263    /// Get the session meta information.
264    ///
265    /// If the client is currently logged in, this will return a
266    /// [`SessionMeta`] object which contains the user ID and device ID.
267    /// Otherwise it returns `None`.
268    pub fn session_meta(&self) -> Option<&SessionMeta> {
269        self.state_store.session_meta()
270    }
271
272    /// Get all the rooms this client knows about.
273    pub fn rooms(&self) -> Vec<Room> {
274        self.state_store.rooms()
275    }
276
277    /// Get all the rooms this client knows about, filtered by room state.
278    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
279        self.state_store.rooms_filtered(filter)
280    }
281
282    /// Get a stream of all the rooms changes, in addition to the existing
283    /// rooms.
284    pub fn rooms_stream(
285        &self,
286    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
287        self.state_store.rooms_stream()
288    }
289
290    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
291    /// yet in the store
292    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
293        self.state_store.get_or_create_room(
294            room_id,
295            room_state,
296            self.room_info_notable_update_sender.clone(),
297        )
298    }
299
300    /// Get a reference to the state store.
301    pub fn state_store(&self) -> &DynStateStore {
302        self.state_store.deref()
303    }
304
305    /// Get a reference to the event cache store.
306    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
307        &self.event_cache_store
308    }
309
310    /// Check whether the client has been activated.
311    ///
312    /// See [`BaseClient::activate`] to know what it means.
313    pub fn is_active(&self) -> bool {
314        self.state_store.session_meta().is_some()
315    }
316
317    /// Activate the client.
318    ///
319    /// A client is considered active when:
320    ///
321    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
322    /// 2. Has loaded cached data from storage,
323    /// 3. If encryption is enabled, it also initialized or restored its
324    ///    `OlmMachine`.
325    ///
326    /// # Arguments
327    ///
328    /// * `session_meta` - The meta of a session that the user already has from
329    ///   a previous login call.
330    ///
331    /// * `custom_account` - A custom
332    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
333    ///   identity and one-time keys of this [`BaseClient`]. If no account is
334    ///   provided, a new default one or one from the store will be used. If an
335    ///   account is provided and one already exists in the store for this
336    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
337    ///   useful if one wishes to create identity keys before knowing the
338    ///   user/device IDs, e.g., to use the identity key as the device ID.
339    ///
340    /// * `room_load_settings` — Specify how many rooms must be restored; use
341    ///   `::default()` if you don't know which value to pick.
342    ///
343    /// # Panics
344    ///
345    /// This method panics if it is called twice.
346    ///
347    /// [`UserId`]: ruma::UserId
348    pub async fn activate(
349        &self,
350        session_meta: SessionMeta,
351        room_load_settings: RoomLoadSettings,
352        #[cfg(feature = "e2e-encryption")] custom_account: Option<
353            crate::crypto::vodozemac::olm::Account,
354        >,
355    ) -> Result<()> {
356        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
357
358        self.state_store
359            .load_rooms(
360                &session_meta.user_id,
361                room_load_settings,
362                &self.room_info_notable_update_sender,
363            )
364            .await?;
365        self.state_store.load_sync_token().await?;
366        self.state_store.set_session_meta(session_meta);
367
368        #[cfg(feature = "e2e-encryption")]
369        self.regenerate_olm(custom_account).await?;
370
371        Ok(())
372    }
373
374    /// Recreate an `OlmMachine` from scratch.
375    ///
376    /// In particular, this will clear all its caches.
377    #[cfg(feature = "e2e-encryption")]
378    pub async fn regenerate_olm(
379        &self,
380        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
381    ) -> Result<()> {
382        tracing::debug!("regenerating OlmMachine");
383        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
384
385        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
386        // because we suspect it has stale data.
387        let olm_machine = OlmMachine::with_store(
388            &session_meta.user_id,
389            &session_meta.device_id,
390            self.crypto_store.clone(),
391            custom_account,
392        )
393        .await
394        .map_err(OlmError::from)?;
395
396        *self.olm_machine.write().await = Some(olm_machine);
397        Ok(())
398    }
399
400    /// Get the current, if any, sync token of the client.
401    /// This will be None if the client didn't sync at least once.
402    pub async fn sync_token(&self) -> Option<String> {
403        self.state_store.sync_token.read().await.clone()
404    }
405
406    /// User has knocked on a room.
407    ///
408    /// Update the internal and cached state accordingly. Return the final Room.
409    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
410        let room = self.state_store.get_or_create_room(
411            room_id,
412            RoomState::Knocked,
413            self.room_info_notable_update_sender.clone(),
414        );
415
416        if room.state() != RoomState::Knocked {
417            let _sync_lock = self.sync_lock().lock().await;
418
419            let mut room_info = room.clone_info();
420            room_info.mark_as_knocked();
421            room_info.mark_state_partially_synced();
422            room_info.mark_members_missing(); // the own member event changed
423            let mut changes = StateChanges::default();
424            changes.add_room(room_info.clone());
425            self.state_store.save_changes(&changes).await?; // Update the store
426            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
427        }
428
429        Ok(room)
430    }
431
432    /// The user has joined a room using this specific client.
433    ///
434    /// This method should be called if the user accepts an invite or if they
435    /// join a public room.
436    ///
437    /// The method will create a [`Room`] object if one does not exist yet and
438    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
439    /// object will be persisted in the cache. Please note that the [`Room`]
440    /// will be a stub until a sync has been received with the full room
441    /// state using [`BaseClient::receive_sync_response`].
442    ///
443    /// Update the internal and cached state accordingly. Return the final Room.
444    ///
445    /// # Arguments
446    ///
447    /// * `room_id` - The unique ID identifying the joined room.
448    /// * `inviter` - When joining this room in response to an invitation, the
449    ///   inviter should be recorded before sending the join request to the
450    ///   server. Providing the inviter here ensures that the
451    ///   [`InviteAcceptanceDetails`] are stored for this room.
452    ///
453    /// # Examples
454    ///
455    /// ```rust
456    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport};
457    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
458    /// # async {
459    /// # let client = BaseClient::new(StoreConfig::new("example".to_owned()), ThreadingSupport::Disabled);
460    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
461    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
462    /// # let room_id: &RoomId = todo!();
463    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
464    /// let room_id = send_join_request().await?;
465    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
466    ///
467    /// assert_eq!(room.state(), RoomState::Joined);
468    /// # matrix_sdk_test::TestResult::Ok(()) };
469    /// ```
470    pub async fn room_joined(
471        &self,
472        room_id: &RoomId,
473        inviter: Option<OwnedUserId>,
474    ) -> Result<Room> {
475        let room = self.state_store.get_or_create_room(
476            room_id,
477            RoomState::Joined,
478            self.room_info_notable_update_sender.clone(),
479        );
480
481        // If the state isn't `RoomState::Joined` then this means that we knew about
482        // this room before. Let's modify the existing state now.
483        if room.state() != RoomState::Joined {
484            let _sync_lock = self.sync_lock().lock().await;
485
486            let mut room_info = room.clone_info();
487            let previous_state = room.state();
488
489            room_info.mark_as_joined();
490            room_info.mark_state_partially_synced();
491            room_info.mark_members_missing(); // the own member event changed
492
493            // If our previous state was an invite and we're now in the joined state, this
494            // means that the user has explicitly accepted an invite. Let's
495            // remember some details about the invite.
496            //
497            // This is somewhat of a workaround for our lack of cryptographic membership.
498            // Later on we will decide if historic room keys should be accepted
499            // based on this info. If a user has accepted an invite and we receive a room
500            // key bundle shortly after, we might accept it. If we don't do
501            // this, the homeserver could trick us into accepting any historic room key
502            // bundle.
503            if previous_state == RoomState::Invited
504                && let Some(inviter) = inviter
505            {
506                let details = InviteAcceptanceDetails {
507                    invite_accepted_at: MilliSecondsSinceUnixEpoch::now(),
508                    inviter,
509                };
510                room_info.set_invite_acceptance_details(details);
511            }
512
513            let mut changes = StateChanges::default();
514            changes.add_room(room_info.clone());
515
516            self.state_store.save_changes(&changes).await?; // Update the store
517
518            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
519        }
520
521        Ok(room)
522    }
523
524    /// User has left a room.
525    ///
526    /// Update the internal and cached state accordingly.
527    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
528        let room = self.state_store.get_or_create_room(
529            room_id,
530            RoomState::Left,
531            self.room_info_notable_update_sender.clone(),
532        );
533
534        if room.state() != RoomState::Left {
535            let _sync_lock = self.sync_lock().lock().await;
536
537            let mut room_info = room.clone_info();
538            room_info.mark_as_left();
539            room_info.mark_state_partially_synced();
540            room_info.mark_members_missing(); // the own member event changed
541            let mut changes = StateChanges::default();
542            changes.add_room(room_info.clone());
543            self.state_store.save_changes(&changes).await?; // Update the store
544            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
545        }
546
547        Ok(())
548    }
549
550    /// Get access to the store's sync lock.
551    pub fn sync_lock(&self) -> &Mutex<()> {
552        self.state_store.sync_lock()
553    }
554
555    /// Receive a response from a sync call.
556    ///
557    /// # Arguments
558    ///
559    /// * `response` - The response that we received after a successful sync.
560    #[instrument(skip_all)]
561    pub async fn receive_sync_response(
562        &self,
563        response: api::sync::sync_events::v3::Response,
564    ) -> Result<SyncResponse> {
565        self.receive_sync_response_with_requested_required_states(
566            response,
567            &RequestedRequiredStates::default(),
568        )
569        .await
570    }
571
572    /// Receive a response from a sync call, with the requested required state
573    /// events.
574    ///
575    /// # Arguments
576    ///
577    /// * `response` - The response that we received after a successful sync.
578    /// * `requested_required_states` - The requested required state events.
579    pub async fn receive_sync_response_with_requested_required_states(
580        &self,
581        response: api::sync::sync_events::v3::Response,
582        requested_required_states: &RequestedRequiredStates,
583    ) -> Result<SyncResponse> {
584        // The server might respond multiple times with the same sync token, in
585        // that case we already received this response and there's nothing to
586        // do.
587        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
588            info!("Got the same sync response twice");
589            return Ok(SyncResponse::default());
590        }
591
592        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
593
594        #[cfg(feature = "e2e-encryption")]
595        let olm_machine = self.olm_machine().await;
596
597        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
598
599        #[cfg(feature = "e2e-encryption")]
600        let to_device = {
601            let processors::e2ee::to_device::Output {
602                processed_to_device_events: to_device,
603                room_key_updates,
604            } = processors::e2ee::to_device::from_sync_v2(
605                &response,
606                olm_machine.as_ref(),
607                &self.decryption_settings,
608            )
609            .await?;
610
611            processors::latest_event::decrypt_from_rooms(
612                &mut context,
613                room_key_updates
614                    .into_iter()
615                    .flatten()
616                    .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
617                    .collect(),
618                processors::e2ee::E2EE::new(
619                    olm_machine.as_ref(),
620                    &self.decryption_settings,
621                    self.handle_verification_events,
622                ),
623            )
624            .await?;
625
626            to_device
627        };
628
629        #[cfg(not(feature = "e2e-encryption"))]
630        let to_device = response
631            .to_device
632            .events
633            .into_iter()
634            .map(|raw| {
635                use matrix_sdk_common::deserialized_responses::{
636                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
637                    ToDeviceUnableToDecryptReason,
638                };
639
640                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
641                    if event_type == "m.room.encrypted" {
642                        ProcessedToDeviceEvent::UnableToDecrypt {
643                            encrypted_event: raw,
644                            utd_info: ToDeviceUnableToDecryptInfo {
645                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
646                            },
647                        }
648                    } else {
649                        ProcessedToDeviceEvent::PlainText(raw)
650                    }
651                } else {
652                    // Exclude events with no type
653                    ProcessedToDeviceEvent::Invalid(raw)
654                }
655            })
656            .collect();
657
658        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
659
660        let global_account_data_processor =
661            processors::account_data::global(&response.account_data.events);
662
663        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
664
665        let mut room_updates = RoomUpdates::default();
666        let mut notifications = Default::default();
667
668        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
669            BTreeMap::new();
670
671        for (room_id, joined_room) in response.rooms.join {
672            let joined_room_update = processors::room::sync_v2::update_joined_room(
673                &mut context,
674                processors::room::RoomCreationData::new(
675                    &room_id,
676                    self.room_info_notable_update_sender.clone(),
677                    requested_required_states,
678                    &mut ambiguity_cache,
679                ),
680                joined_room,
681                &mut updated_members_in_room,
682                processors::notification::Notification::new(
683                    &push_rules,
684                    &mut notifications,
685                    &self.state_store,
686                ),
687                #[cfg(feature = "e2e-encryption")]
688                processors::e2ee::E2EE::new(
689                    olm_machine.as_ref(),
690                    &self.decryption_settings,
691                    self.handle_verification_events,
692                ),
693            )
694            .await?;
695
696            room_updates.joined.insert(room_id, joined_room_update);
697        }
698
699        for (room_id, left_room) in response.rooms.leave {
700            let left_room_update = processors::room::sync_v2::update_left_room(
701                &mut context,
702                processors::room::RoomCreationData::new(
703                    &room_id,
704                    self.room_info_notable_update_sender.clone(),
705                    requested_required_states,
706                    &mut ambiguity_cache,
707                ),
708                left_room,
709                processors::notification::Notification::new(
710                    &push_rules,
711                    &mut notifications,
712                    &self.state_store,
713                ),
714                #[cfg(feature = "e2e-encryption")]
715                processors::e2ee::E2EE::new(
716                    olm_machine.as_ref(),
717                    &self.decryption_settings,
718                    self.handle_verification_events,
719                ),
720            )
721            .await?;
722
723            room_updates.left.insert(room_id, left_room_update);
724        }
725
726        for (room_id, invited_room) in response.rooms.invite {
727            let invited_room_update = processors::room::sync_v2::update_invited_room(
728                &mut context,
729                &room_id,
730                invited_room,
731                self.room_info_notable_update_sender.clone(),
732                processors::notification::Notification::new(
733                    &push_rules,
734                    &mut notifications,
735                    &self.state_store,
736                ),
737            )
738            .await?;
739
740            room_updates.invited.insert(room_id, invited_room_update);
741        }
742
743        for (room_id, knocked_room) in response.rooms.knock {
744            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
745                &mut context,
746                &room_id,
747                knocked_room,
748                self.room_info_notable_update_sender.clone(),
749                processors::notification::Notification::new(
750                    &push_rules,
751                    &mut notifications,
752                    &self.state_store,
753                ),
754            )
755            .await?;
756
757            room_updates.knocked.insert(room_id, knocked_room_update);
758        }
759
760        global_account_data_processor.apply(&mut context, &self.state_store).await;
761
762        context.state_changes.presence = response
763            .presence
764            .events
765            .iter()
766            .filter_map(|e| {
767                let event = e.deserialize().ok()?;
768                Some((event.sender, e.clone()))
769            })
770            .collect();
771
772        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
773
774        {
775            let _sync_lock = self.sync_lock().lock().await;
776
777            processors::changes::save_and_apply(
778                context,
779                &self.state_store,
780                &self.ignore_user_list_changes,
781                Some(response.next_batch.clone()),
782            )
783            .await?;
784        }
785
786        let mut context = Context::default();
787
788        // Now that all the rooms information have been saved, update the display name
789        // of the updated rooms (which relies on information stored in the database).
790        processors::room::display_name::update_for_rooms(
791            &mut context,
792            &room_updates,
793            &self.state_store,
794        )
795        .await;
796
797        // Save the new display name updates if any.
798        processors::changes::save_only(context, &self.state_store).await?;
799
800        for (room_id, member_ids) in updated_members_in_room {
801            if let Some(room) = self.get_room(&room_id) {
802                let _ =
803                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
804            }
805        }
806
807        if enabled!(Level::INFO) {
808            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
809        }
810
811        let response = SyncResponse {
812            rooms: room_updates,
813            presence: response.presence.events,
814            account_data: response.account_data.events,
815            to_device,
816            notifications,
817        };
818
819        Ok(response)
820    }
821
822    /// Receive a get member events response and convert it to a deserialized
823    /// `MembersResponse`
824    ///
825    /// This client-server request must be made without filters to make sure all
826    /// members are received. Otherwise, an error is returned.
827    ///
828    /// # Arguments
829    ///
830    /// * `room_id` - The room id this response belongs to.
831    ///
832    /// * `response` - The raw response that was received from the server.
833    #[instrument(skip_all, fields(?room_id))]
834    pub async fn receive_all_members(
835        &self,
836        room_id: &RoomId,
837        request: &api::membership::get_member_events::v3::Request,
838        response: &api::membership::get_member_events::v3::Response,
839    ) -> Result<()> {
840        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
841        {
842            // This function assumes all members are loaded at once to optimise how display
843            // name disambiguation works. Using it with partial member list results
844            // would produce incorrect disambiguated display name entries
845            return Err(Error::InvalidReceiveMembersParameters);
846        }
847
848        let Some(room) = self.state_store.room(room_id) else {
849            // The room is unknown to us: leave early.
850            return Ok(());
851        };
852
853        let mut chunk = Vec::with_capacity(response.chunk.len());
854        let mut context = Context::default();
855
856        #[cfg(feature = "e2e-encryption")]
857        let mut user_ids = BTreeSet::new();
858
859        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
860
861        for raw_event in &response.chunk {
862            let member = match raw_event.deserialize() {
863                Ok(ev) => ev,
864                Err(e) => {
865                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
866                    debug!(event_id, "Failed to deserialize member event: {e}");
867                    continue;
868                }
869            };
870
871            // TODO: All the actions in this loop used to be done only when the membership
872            // event was not in the store before. This was changed with the new room API,
873            // because e.g. leaving a room makes members events outdated and they need to be
874            // fetched by `members`. Therefore, they need to be overwritten here, even
875            // if they exist.
876            // However, this makes a new problem occur where setting the member events here
877            // potentially races with the sync.
878            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
879
880            #[cfg(feature = "e2e-encryption")]
881            match member.membership() {
882                MembershipState::Join | MembershipState::Invite => {
883                    user_ids.insert(member.state_key().to_owned());
884                }
885                _ => (),
886            }
887
888            if let StateEvent::Original(e) = &member
889                && let Some(d) = &e.content.displayname
890            {
891                let display_name = DisplayName::new(d);
892                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
893            }
894
895            let sync_member: SyncRoomMemberEvent = member.clone().into();
896            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
897
898            context
899                .state_changes
900                .state
901                .entry(room_id.to_owned())
902                .or_default()
903                .entry(member.event_type())
904                .or_default()
905                .insert(member.state_key().to_string(), raw_event.clone().cast());
906            chunk.push(member);
907        }
908
909        #[cfg(feature = "e2e-encryption")]
910        processors::e2ee::tracked_users::update(
911            self.olm_machine().await.as_ref(),
912            room.encryption_state(),
913            &user_ids,
914        )
915        .await?;
916
917        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
918
919        let _sync_lock = self.sync_lock().lock().await;
920        let mut room_info = room.clone_info();
921        room_info.mark_members_synced();
922        context.state_changes.add_room(room_info);
923
924        processors::changes::save_and_apply(
925            context,
926            &self.state_store,
927            &self.ignore_user_list_changes,
928            None,
929        )
930        .await?;
931
932        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
933
934        Ok(())
935    }
936
937    /// Receive a successful filter upload response, the filter id will be
938    /// stored under the given name in the store.
939    ///
940    /// The filter id can later be retrieved with the [`get_filter`] method.
941    ///
942    ///
943    /// # Arguments
944    ///
945    /// * `filter_name` - The name that should be used to persist the filter id
946    ///   in the store.
947    ///
948    /// * `response` - The successful filter upload response containing the
949    ///   filter id.
950    ///
951    /// [`get_filter`]: #method.get_filter
952    pub async fn receive_filter_upload(
953        &self,
954        filter_name: &str,
955        response: &api::filter::create_filter::v3::Response,
956    ) -> Result<()> {
957        Ok(self
958            .state_store
959            .set_kv_data(
960                StateStoreDataKey::Filter(filter_name),
961                StateStoreDataValue::Filter(response.filter_id.clone()),
962            )
963            .await?)
964    }
965
966    /// Get the filter id of a previously uploaded filter.
967    ///
968    /// *Note*: A filter will first need to be uploaded and persisted using
969    /// [`receive_filter_upload`].
970    ///
971    /// # Arguments
972    ///
973    /// * `filter_name` - The name of the filter that was previously used to
974    ///   persist the filter.
975    ///
976    /// [`receive_filter_upload`]: #method.receive_filter_upload
977    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
978        let filter = self
979            .state_store
980            .get_kv_data(StateStoreDataKey::Filter(filter_name))
981            .await?
982            .map(|d| d.into_filter().expect("State store data not a filter"));
983
984        Ok(filter)
985    }
986
987    /// Get a to-device request that will share a room key with users in a room.
988    #[cfg(feature = "e2e-encryption")]
989    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
990        match self.olm_machine().await.as_ref() {
991            Some(o) => {
992                let Some(room) = self.get_room(room_id) else {
993                    return Err(Error::InsufficientData);
994                };
995
996                let history_visibility = room.history_visibility_or_default();
997                let Some(room_encryption_event) = room.encryption_settings() else {
998                    return Err(Error::EncryptionNotEnabled);
999                };
1000
1001                // Don't share the group session with members that are invited
1002                // if the history visibility is set to `Joined`
1003                let filter = if history_visibility == HistoryVisibility::Joined {
1004                    RoomMemberships::JOIN
1005                } else {
1006                    RoomMemberships::ACTIVE
1007                };
1008
1009                let members = self.state_store.get_user_ids(room_id, filter).await?;
1010
1011                let settings = EncryptionSettings::new(
1012                    room_encryption_event,
1013                    history_visibility,
1014                    self.room_key_recipient_strategy.clone(),
1015                );
1016
1017                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1018            }
1019            None => panic!("Olm machine wasn't started"),
1020        }
1021    }
1022
1023    /// Get the room with the given room id.
1024    ///
1025    /// # Arguments
1026    ///
1027    /// * `room_id` - The id of the room that should be fetched.
1028    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1029        self.state_store.room(room_id)
1030    }
1031
1032    /// Forget the room with the given room ID.
1033    ///
1034    /// The room will be dropped from the room list and the store.
1035    ///
1036    /// # Arguments
1037    ///
1038    /// * `room_id` - The id of the room that should be forgotten.
1039    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1040        // Forget the room in the state store.
1041        self.state_store.forget_room(room_id).await?;
1042
1043        // Remove the room in the event cache store too.
1044        self.event_cache_store().lock().await?.remove_room(room_id).await?;
1045
1046        Ok(())
1047    }
1048
1049    /// Get the olm machine.
1050    #[cfg(feature = "e2e-encryption")]
1051    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1052        self.olm_machine.read().await
1053    }
1054
1055    /// Get the push rules.
1056    ///
1057    /// Gets the push rules previously processed, otherwise get them from the
1058    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1059    /// is logged in.
1060    pub(crate) async fn get_push_rules(
1061        &self,
1062        global_account_data_processor: &processors::account_data::Global,
1063    ) -> Result<Ruleset> {
1064        let _timer = timer!(Level::TRACE, "get_push_rules");
1065        if let Some(event) = global_account_data_processor
1066            .push_rules()
1067            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1068        {
1069            Ok(event.content.global)
1070        } else if let Some(event) = self
1071            .state_store
1072            .get_account_data_event_static::<PushRulesEventContent>()
1073            .await?
1074            .and_then(|ev| ev.deserialize().ok())
1075        {
1076            Ok(event.content.global)
1077        } else if let Some(session_meta) = self.state_store.session_meta() {
1078            Ok(Ruleset::server_default(&session_meta.user_id))
1079        } else {
1080            Ok(Ruleset::new())
1081        }
1082    }
1083
1084    /// Returns a subscriber that publishes an event every time the ignore user
1085    /// list changes
1086    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1087        self.ignore_user_list_changes.subscribe()
1088    }
1089
1090    /// Returns a new receiver that gets future room info notable updates.
1091    ///
1092    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1093    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1094        self.room_info_notable_update_sender.subscribe()
1095    }
1096
1097    /// Checks whether the provided `user_id` belongs to an ignored user.
1098    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1099        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1100        {
1101            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1102                Ok(current_ignored_user_list) => {
1103                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1104                }
1105                Err(error) => {
1106                    warn!(?error, "Failed to deserialize the ignored user list event");
1107                    false
1108                }
1109            },
1110            Ok(None) => false,
1111            Err(error) => {
1112                warn!(?error, "Could not get the ignored user list from the state store");
1113                false
1114            }
1115        }
1116    }
1117}
1118
1119/// Represent the `required_state` values sent by a sync request.
1120///
1121/// This is useful to track what state events have been requested when handling
1122/// a response.
1123///
1124/// For example, if a sync requests the `m.room.encryption` state event, and the
1125/// server replies with nothing, if means the room **is not** encrypted. Without
1126/// knowing which state event was required by the sync, it is impossible to
1127/// interpret the absence of state event from the server as _the room's
1128/// encryption state is **not encrypted**_ or _the room's encryption state is
1129/// **unknown**_.
1130#[derive(Debug, Default)]
1131pub struct RequestedRequiredStates {
1132    default: Vec<(StateEventType, String)>,
1133    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1134}
1135
1136impl RequestedRequiredStates {
1137    /// Create a new `RequestedRequiredStates`.
1138    ///
1139    /// `default` represents the `required_state` value for all rooms.
1140    /// `for_rooms` is the `required_state` per room.
1141    pub fn new(
1142        default: Vec<(StateEventType, String)>,
1143        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1144    ) -> Self {
1145        Self { default, for_rooms }
1146    }
1147
1148    /// Get the `required_state` value for a specific room.
1149    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1150        self.for_rooms.get(room_id).unwrap_or(&self.default)
1151    }
1152}
1153
1154impl From<&v5::Request> for RequestedRequiredStates {
1155    fn from(request: &v5::Request) -> Self {
1156        // The following information is missing in the MSC4186 at the time of writing
1157        // (2025-03-12) but: the `required_state`s from all lists and from all room
1158        // subscriptions are combined by doing an union.
1159        //
1160        // Thus, we can do the same here, put the union in `default` and keep
1161        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1162        let mut default = BTreeSet::new();
1163
1164        for list in request.lists.values() {
1165            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1166        }
1167
1168        for room_subscription in request.room_subscriptions.values() {
1169            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1170        }
1171
1172        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use std::collections::HashMap;
1179
1180    use assert_matches2::{assert_let, assert_matches};
1181    use futures_util::FutureExt as _;
1182    use matrix_sdk_test::{
1183        BOB, InvitedRoomBuilder, LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent,
1184        SyncResponseBuilder, async_test, event_factory::EventFactory, ruma_response_from_json,
1185    };
1186    use ruma::{
1187        api::client::{self as api, sync::sync_events::v5},
1188        event_id,
1189        events::{StateEventType, room::member::MembershipState},
1190        room_id,
1191        serde::Raw,
1192        user_id,
1193    };
1194    use serde_json::{json, value::to_raw_value};
1195
1196    use super::{BaseClient, RequestedRequiredStates};
1197    use crate::{
1198        RoomDisplayName, RoomState, SessionMeta,
1199        client::ThreadingSupport,
1200        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1201        test_utils::logged_in_base_client,
1202    };
1203
1204    #[test]
1205    fn test_requested_required_states() {
1206        let room_id_0 = room_id!("!r0");
1207        let room_id_1 = room_id!("!r1");
1208
1209        let requested_required_states = RequestedRequiredStates::new(
1210            vec![(StateEventType::RoomAvatar, "".to_owned())],
1211            HashMap::from([(
1212                room_id_0.to_owned(),
1213                vec![
1214                    (StateEventType::RoomMember, "foo".to_owned()),
1215                    (StateEventType::RoomEncryption, "".to_owned()),
1216                ],
1217            )]),
1218        );
1219
1220        // A special set of state events exists for `room_id_0`.
1221        assert_eq!(
1222            requested_required_states.for_room(room_id_0),
1223            &[
1224                (StateEventType::RoomMember, "foo".to_owned()),
1225                (StateEventType::RoomEncryption, "".to_owned()),
1226            ]
1227        );
1228
1229        // No special list for `room_id_1`, it should return the defaults.
1230        assert_eq!(
1231            requested_required_states.for_room(room_id_1),
1232            &[(StateEventType::RoomAvatar, "".to_owned()),]
1233        );
1234    }
1235
1236    #[test]
1237    fn test_requested_required_states_from_sync_v5_request() {
1238        let room_id_0 = room_id!("!r0");
1239        let room_id_1 = room_id!("!r1");
1240
1241        // Empty request.
1242        let mut request = v5::Request::new();
1243
1244        {
1245            let requested_required_states = RequestedRequiredStates::from(&request);
1246
1247            assert!(requested_required_states.default.is_empty());
1248            assert!(requested_required_states.for_rooms.is_empty());
1249        }
1250
1251        // One list.
1252        request.lists.insert("foo".to_owned(), {
1253            let mut list = v5::request::List::default();
1254            list.room_details.required_state = vec![
1255                (StateEventType::RoomAvatar, "".to_owned()),
1256                (StateEventType::RoomEncryption, "".to_owned()),
1257            ];
1258
1259            list
1260        });
1261
1262        {
1263            let requested_required_states = RequestedRequiredStates::from(&request);
1264
1265            assert_eq!(
1266                requested_required_states.default,
1267                &[
1268                    (StateEventType::RoomAvatar, "".to_owned()),
1269                    (StateEventType::RoomEncryption, "".to_owned())
1270                ]
1271            );
1272            assert!(requested_required_states.for_rooms.is_empty());
1273        }
1274
1275        // Two lists.
1276        request.lists.insert("bar".to_owned(), {
1277            let mut list = v5::request::List::default();
1278            list.room_details.required_state = vec![
1279                (StateEventType::RoomEncryption, "".to_owned()),
1280                (StateEventType::RoomName, "".to_owned()),
1281            ];
1282
1283            list
1284        });
1285
1286        {
1287            let requested_required_states = RequestedRequiredStates::from(&request);
1288
1289            // Union of the state events.
1290            assert_eq!(
1291                requested_required_states.default,
1292                &[
1293                    (StateEventType::RoomAvatar, "".to_owned()),
1294                    (StateEventType::RoomEncryption, "".to_owned()),
1295                    (StateEventType::RoomName, "".to_owned()),
1296                ]
1297            );
1298            assert!(requested_required_states.for_rooms.is_empty());
1299        }
1300
1301        // One room subscription.
1302        request.room_subscriptions.insert(room_id_0.to_owned(), {
1303            let mut room_subscription = v5::request::RoomSubscription::default();
1304
1305            room_subscription.required_state = vec![
1306                (StateEventType::RoomJoinRules, "".to_owned()),
1307                (StateEventType::RoomEncryption, "".to_owned()),
1308            ];
1309
1310            room_subscription
1311        });
1312
1313        {
1314            let requested_required_states = RequestedRequiredStates::from(&request);
1315
1316            // Union of state events, all in `default`, still nothing in `for_rooms`.
1317            assert_eq!(
1318                requested_required_states.default,
1319                &[
1320                    (StateEventType::RoomAvatar, "".to_owned()),
1321                    (StateEventType::RoomEncryption, "".to_owned()),
1322                    (StateEventType::RoomJoinRules, "".to_owned()),
1323                    (StateEventType::RoomName, "".to_owned()),
1324                ]
1325            );
1326            assert!(requested_required_states.for_rooms.is_empty());
1327        }
1328
1329        // Two room subscriptions.
1330        request.room_subscriptions.insert(room_id_1.to_owned(), {
1331            let mut room_subscription = v5::request::RoomSubscription::default();
1332
1333            room_subscription.required_state = vec![
1334                (StateEventType::RoomName, "".to_owned()),
1335                (StateEventType::RoomTopic, "".to_owned()),
1336            ];
1337
1338            room_subscription
1339        });
1340
1341        {
1342            let requested_required_states = RequestedRequiredStates::from(&request);
1343
1344            // Union of state events, all in `default`, still nothing in `for_rooms`.
1345            assert_eq!(
1346                requested_required_states.default,
1347                &[
1348                    (StateEventType::RoomAvatar, "".to_owned()),
1349                    (StateEventType::RoomEncryption, "".to_owned()),
1350                    (StateEventType::RoomJoinRules, "".to_owned()),
1351                    (StateEventType::RoomName, "".to_owned()),
1352                    (StateEventType::RoomTopic, "".to_owned()),
1353                ]
1354            );
1355        }
1356    }
1357
1358    #[async_test]
1359    async fn test_invite_after_leaving() {
1360        let user_id = user_id!("@alice:example.org");
1361        let room_id = room_id!("!test:example.org");
1362
1363        let client = logged_in_base_client(Some(user_id)).await;
1364
1365        let mut sync_builder = SyncResponseBuilder::new();
1366
1367        let response = sync_builder
1368            .add_left_room(
1369                LeftRoomBuilder::new(room_id).add_timeline_event(
1370                    EventFactory::new()
1371                        .member(user_id)
1372                        .membership(MembershipState::Leave)
1373                        .display_name("Alice")
1374                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1375                ),
1376            )
1377            .build_sync_response();
1378        client.receive_sync_response(response).await.unwrap();
1379        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1380
1381        let response = sync_builder
1382            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1383                StrippedStateTestEvent::Custom(json!({
1384                    "content": {
1385                        "displayname": "Alice",
1386                        "membership": "invite",
1387                    },
1388                    "event_id": "$143273582443PhrSn:example.org",
1389                    "origin_server_ts": 1432735824653u64,
1390                    "sender": "@example:example.org",
1391                    "state_key": user_id,
1392                    "type": "m.room.member",
1393                })),
1394            ))
1395            .build_sync_response();
1396        client.receive_sync_response(response).await.unwrap();
1397        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1398    }
1399
1400    #[async_test]
1401    async fn test_invite_displayname() {
1402        let user_id = user_id!("@alice:example.org");
1403        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1404
1405        let client = logged_in_base_client(Some(user_id)).await;
1406
1407        let response = ruma_response_from_json(&json!({
1408            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1409            "device_one_time_keys_count": {
1410                "signed_curve25519": 50u64
1411            },
1412            "device_unused_fallback_key_types": [
1413                "signed_curve25519"
1414            ],
1415            "rooms": {
1416                "invite": {
1417                    "!ithpyNKDtmhneaTQja:example.org": {
1418                        "invite_state": {
1419                            "events": [
1420                                {
1421                                    "content": {
1422                                        "creator": "@test:example.org",
1423                                        "room_version": "9"
1424                                    },
1425                                    "sender": "@test:example.org",
1426                                    "state_key": "",
1427                                    "type": "m.room.create"
1428                                },
1429                                {
1430                                    "content": {
1431                                        "join_rule": "invite"
1432                                    },
1433                                    "sender": "@test:example.org",
1434                                    "state_key": "",
1435                                    "type": "m.room.join_rules"
1436                                },
1437                                {
1438                                    "content": {
1439                                        "algorithm": "m.megolm.v1.aes-sha2"
1440                                    },
1441                                    "sender": "@test:example.org",
1442                                    "state_key": "",
1443                                    "type": "m.room.encryption"
1444                                },
1445                                {
1446                                    "content": {
1447                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1448                                        "displayname": "Kyra",
1449                                        "membership": "join"
1450                                    },
1451                                    "sender": "@test:example.org",
1452                                    "state_key": "@test:example.org",
1453                                    "type": "m.room.member"
1454                                },
1455                                {
1456                                    "content": {
1457                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1458                                        "displayname": "alice",
1459                                        "is_direct": true,
1460                                        "membership": "invite"
1461                                    },
1462                                    "origin_server_ts": 1650878657984u64,
1463                                    "sender": "@test:example.org",
1464                                    "state_key": "@alice:example.org",
1465                                    "type": "m.room.member",
1466                                    "unsigned": {
1467                                        "age": 14u64
1468                                    },
1469                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1470                                }
1471                            ]
1472                        }
1473                    }
1474                }
1475            }
1476        }));
1477
1478        client.receive_sync_response(response).await.unwrap();
1479
1480        let room = client.get_room(room_id).expect("Room not found");
1481        assert_eq!(room.state(), RoomState::Invited);
1482        assert_eq!(
1483            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1484            RoomDisplayName::Calculated("Kyra".to_owned())
1485        );
1486    }
1487
1488    #[async_test]
1489    async fn test_deserialization_failure() {
1490        let user_id = user_id!("@alice:example.org");
1491        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1492
1493        let client = BaseClient::new(
1494            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1495            ThreadingSupport::Disabled,
1496        );
1497        client
1498            .activate(
1499                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1500                RoomLoadSettings::default(),
1501                #[cfg(feature = "e2e-encryption")]
1502                None,
1503            )
1504            .await
1505            .unwrap();
1506
1507        let response = ruma_response_from_json(&json!({
1508            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1509            "rooms": {
1510                "join": {
1511                    "!ithpyNKDtmhneaTQja:example.org": {
1512                        "state": {
1513                            "events": [
1514                                {
1515                                    "invalid": "invalid",
1516                                },
1517                                {
1518                                    "content": {
1519                                        "name": "The room name"
1520                                    },
1521                                    "event_id": "$143273582443PhrSn:example.org",
1522                                    "origin_server_ts": 1432735824653u64,
1523                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1524                                    "sender": "@example:example.org",
1525                                    "state_key": "",
1526                                    "type": "m.room.name",
1527                                    "unsigned": {
1528                                        "age": 1234
1529                                    }
1530                                },
1531                            ]
1532                        }
1533                    }
1534                }
1535            }
1536        }));
1537
1538        client.receive_sync_response(response).await.unwrap();
1539        client
1540            .state_store()
1541            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1542            .await
1543            .expect("Failed to fetch state event")
1544            .expect("State event not found")
1545            .deserialize()
1546            .expect("Failed to deserialize state event");
1547    }
1548
1549    #[async_test]
1550    async fn test_invited_members_arent_ignored() {
1551        let user_id = user_id!("@alice:example.org");
1552        let inviter_user_id = user_id!("@bob:example.org");
1553        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1554
1555        let client = BaseClient::new(
1556            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1557            ThreadingSupport::Disabled,
1558        );
1559        client
1560            .activate(
1561                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1562                RoomLoadSettings::default(),
1563                #[cfg(feature = "e2e-encryption")]
1564                None,
1565            )
1566            .await
1567            .unwrap();
1568
1569        // Preamble: let the SDK know about the room.
1570        let mut sync_builder = SyncResponseBuilder::new();
1571        let response = sync_builder
1572            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1573            .build_sync_response();
1574        client.receive_sync_response(response).await.unwrap();
1575
1576        // When I process the result of a /members request that only contains an invited
1577        // member,
1578        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1579
1580        let raw_member_event = json!({
1581            "content": {
1582                "avatar_url": "mxc://localhost/fewjilfewjil42",
1583                "displayname": "Invited Alice",
1584                "membership": "invite"
1585            },
1586            "event_id": "$151800140517rfvjc:localhost",
1587            "origin_server_ts": 151800140,
1588            "room_id": room_id,
1589            "sender": inviter_user_id,
1590            "state_key": user_id,
1591            "type": "m.room.member",
1592            "unsigned": {
1593                "age": 13374242,
1594            }
1595        });
1596        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1597            to_raw_value(&raw_member_event).unwrap(),
1598        )]);
1599
1600        // It's correctly processed,
1601        client.receive_all_members(room_id, &request, &response).await.unwrap();
1602
1603        let room = client.get_room(room_id).unwrap();
1604
1605        // And I can get the invited member display name and avatar.
1606        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1607
1608        assert_eq!(member.user_id(), user_id);
1609        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1610        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1611    }
1612
1613    #[async_test]
1614    async fn test_reinvited_members_get_a_display_name() {
1615        let user_id = user_id!("@alice:example.org");
1616        let inviter_user_id = user_id!("@bob:example.org");
1617        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1618
1619        let client = BaseClient::new(
1620            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1621            ThreadingSupport::Disabled,
1622        );
1623        client
1624            .activate(
1625                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1626                RoomLoadSettings::default(),
1627                #[cfg(feature = "e2e-encryption")]
1628                None,
1629            )
1630            .await
1631            .unwrap();
1632
1633        // Preamble: let the SDK know about the room, and that the invited user left it.
1634        let mut sync_builder = SyncResponseBuilder::new();
1635        let response = sync_builder
1636            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1637                StateTestEvent::Custom(json!({
1638                    "content": {
1639                        "avatar_url": null,
1640                        "displayname": null,
1641                        "membership": "leave"
1642                    },
1643                    "event_id": "$151803140217rkvjc:localhost",
1644                    "origin_server_ts": 151800139,
1645                    "room_id": room_id,
1646                    "sender": user_id,
1647                    "state_key": user_id,
1648                    "type": "m.room.member",
1649                })),
1650            ))
1651            .build_sync_response();
1652        client.receive_sync_response(response).await.unwrap();
1653
1654        // Now, say that the user has been re-invited.
1655        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1656
1657        let raw_member_event = json!({
1658            "content": {
1659                "avatar_url": "mxc://localhost/fewjilfewjil42",
1660                "displayname": "Invited Alice",
1661                "membership": "invite"
1662            },
1663            "event_id": "$151800140517rfvjc:localhost",
1664            "origin_server_ts": 151800140,
1665            "room_id": room_id,
1666            "sender": inviter_user_id,
1667            "state_key": user_id,
1668            "type": "m.room.member",
1669            "unsigned": {
1670                "age": 13374242,
1671            }
1672        });
1673        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1674            to_raw_value(&raw_member_event).unwrap(),
1675        )]);
1676
1677        // It's correctly processed,
1678        client.receive_all_members(room_id, &request, &response).await.unwrap();
1679
1680        let room = client.get_room(room_id).unwrap();
1681
1682        // And I can get the invited member display name and avatar.
1683        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1684
1685        assert_eq!(member.user_id(), user_id);
1686        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1687        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1688    }
1689
1690    #[async_test]
1691    async fn test_ignored_user_list_changes() {
1692        let user_id = user_id!("@alice:example.org");
1693        let client = BaseClient::new(
1694            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1695            ThreadingSupport::Disabled,
1696        );
1697
1698        client
1699            .activate(
1700                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1701                RoomLoadSettings::default(),
1702                #[cfg(feature = "e2e-encryption")]
1703                None,
1704            )
1705            .await
1706            .unwrap();
1707
1708        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1709        assert!(subscriber.next().now_or_never().is_none());
1710
1711        let f = EventFactory::new();
1712        let mut sync_builder = SyncResponseBuilder::new();
1713        let response = sync_builder
1714            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1715            .build_sync_response();
1716        client.receive_sync_response(response).await.unwrap();
1717
1718        assert_let!(Some(ignored) = subscriber.next().await);
1719        assert_eq!(ignored, [BOB.to_string()]);
1720
1721        // Receive the same response.
1722        let response = sync_builder
1723            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1724            .build_sync_response();
1725        client.receive_sync_response(response).await.unwrap();
1726
1727        // No changes in the ignored list.
1728        assert!(subscriber.next().now_or_never().is_none());
1729
1730        // Now remove Bob from the ignored list.
1731        let response =
1732            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1733        client.receive_sync_response(response).await.unwrap();
1734
1735        assert_let!(Some(ignored) = subscriber.next().await);
1736        assert!(ignored.is_empty());
1737    }
1738
1739    #[async_test]
1740    async fn test_is_user_ignored() {
1741        let ignored_user_id = user_id!("@alice:example.org");
1742        let client = logged_in_base_client(None).await;
1743
1744        let mut sync_builder = SyncResponseBuilder::new();
1745        let f = EventFactory::new();
1746        let response = sync_builder
1747            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1748            .build_sync_response();
1749        client.receive_sync_response(response).await.unwrap();
1750
1751        assert!(client.is_user_ignored(ignored_user_id).await);
1752    }
1753
1754    #[async_test]
1755    async fn test_invite_details_are_set() {
1756        let user_id = user_id!("@alice:localhost");
1757        let client = logged_in_base_client(Some(user_id)).await;
1758        let invited_room_id = room_id!("!invited:localhost");
1759        let unknown_room_id = room_id!("!unknown:localhost");
1760
1761        let mut sync_builder = SyncResponseBuilder::new();
1762        let response = sync_builder
1763            .add_invited_room(InvitedRoomBuilder::new(invited_room_id))
1764            .build_sync_response();
1765        client.receive_sync_response(response).await.unwrap();
1766
1767        // Let us first check the initial state, we should have a room in the invite
1768        // state.
1769        let invited_room = client
1770            .get_room(invited_room_id)
1771            .expect("The sync should have created a room in the invited state");
1772
1773        assert_eq!(invited_room.state(), RoomState::Invited);
1774        assert!(invited_room.invite_acceptance_details().is_none());
1775
1776        // Now we join the room.
1777        let joined_room = client
1778            .room_joined(invited_room_id, Some(user_id.to_owned()))
1779            .await
1780            .expect("We should be able to mark a room as joined");
1781
1782        // Yup, we now have some invite details.
1783        assert_eq!(joined_room.state(), RoomState::Joined);
1784        assert_matches!(joined_room.invite_acceptance_details(), Some(details));
1785        assert_eq!(details.inviter, user_id);
1786
1787        // If we didn't know about the room before the join, we assume that there wasn't
1788        // an invite and we don't record the timestamp.
1789        assert!(client.get_room(unknown_room_id).is_none());
1790        let unknown_room = client
1791            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1792            .await
1793            .expect("We should be able to mark a room as joined");
1794
1795        assert_eq!(unknown_room.state(), RoomState::Joined);
1796        assert!(unknown_room.invite_acceptance_details().is_none());
1797
1798        sync_builder.clear();
1799        let response =
1800            sync_builder.add_left_room(LeftRoomBuilder::new(invited_room_id)).build_sync_response();
1801        client.receive_sync_response(response).await.unwrap();
1802
1803        // Now that we left the room, we shouldn't have any details anymore.
1804        let left_room = client
1805            .get_room(invited_room_id)
1806            .expect("The sync should have created a room in the invited state");
1807
1808        assert_eq!(left_room.state(), RoomState::Left);
1809        assert!(left_room.invite_acceptance_details().is_none());
1810    }
1811}