matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
32use matrix_sdk_base::{
33    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
34    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
35    event_cache::store::EventCacheStoreLock,
36    media::store::MediaStoreLock,
37    store::{
38        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
39        WellKnownResponse,
40    },
41    sync::{Notification, RoomUpdates},
42};
43use matrix_sdk_common::ttl_cache::TtlCache;
44#[cfg(feature = "e2e-encryption")]
45use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
46use ruma::{
47    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
48    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
49    api::{
50        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
51        auth_scheme::{AuthScheme, SendAccessToken},
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{
59                discover_homeserver::{self, RtcFocusInfo},
60                get_capabilities::{self, v3::Capabilities},
61                get_supported_versions,
62            },
63            error::ErrorKind,
64            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
65            knock::knock_room,
66            media,
67            membership::{join_room_by_id, join_room_by_id_or_alias},
68            room::create_room,
69            session::login::v3::DiscoveryInfo,
70            sync::sync_events,
71            threads::get_thread_subscriptions_changes,
72            uiaa,
73            user_directory::search_users,
74        },
75        error::FromHttpResponseError,
76        path_builder::PathBuilder,
77    },
78    assign,
79    events::direct::DirectUserIdentifier,
80    push::Ruleset,
81    time::Instant,
82};
83use serde::de::DeserializeOwned;
84use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
85use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
86use url::Url;
87
88use self::{
89    caches::{CachedValue, ClientCaches},
90    futures::SendRequest,
91};
92use crate::{
93    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
94    Room, SessionTokens, TransmissionProgress,
95    authentication::{
96        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
97        oauth::OAuth,
98    },
99    client::thread_subscriptions::ThreadSubscriptionCatchup,
100    config::{RequestConfig, SyncToken},
101    deduplicating_handler::DeduplicatingHandler,
102    error::HttpResult,
103    event_cache::EventCache,
104    event_handler::{
105        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
106        EventHandlerStore, ObservableEventHandler, SyncEvent,
107    },
108    http_client::{HttpClient, SupportedPathBuilder},
109    latest_events::LatestEvents,
110    media::MediaError,
111    notification_settings::NotificationSettings,
112    room::RoomMember,
113    room_preview::RoomPreview,
114    send_queue::{SendQueue, SendQueueData},
115    sliding_sync::Version as SlidingSyncVersion,
116    sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120    cross_process_lock::CrossProcessLock,
121    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
122};
123
124mod builder;
125pub(crate) mod caches;
126pub(crate) mod futures;
127pub(crate) mod thread_subscriptions;
128
129pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
130#[cfg(feature = "experimental-search")]
131use crate::search_index::SearchIndex;
132
133#[cfg(not(target_family = "wasm"))]
134type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
135#[cfg(target_family = "wasm")]
136type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
137
138#[cfg(not(target_family = "wasm"))]
139type NotificationHandlerFn =
140    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
143
144/// Enum controlling if a loop running callbacks should continue or abort.
145///
146/// This is mainly used in the [`sync_with_callback`] method, the return value
147/// of the provided callback controls if the sync loop should be exited.
148///
149/// [`sync_with_callback`]: #method.sync_with_callback
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum LoopCtrl {
152    /// Continue running the loop.
153    Continue,
154    /// Break out of the loop.
155    Break,
156}
157
158/// Represents changes that can occur to a `Client`s `Session`.
159#[derive(Debug, Clone, PartialEq)]
160pub enum SessionChange {
161    /// The session's token is no longer valid.
162    UnknownToken {
163        /// Whether or not the session was soft logged out
164        soft_logout: bool,
165    },
166    /// The session's tokens have been refreshed.
167    TokensRefreshed,
168}
169
170/// Information about the server vendor obtained from the federation API.
171#[derive(Debug, Clone, PartialEq, Eq)]
172#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
173pub struct ServerVendorInfo {
174    /// The server name.
175    pub server_name: String,
176    /// The server version.
177    pub version: String,
178}
179
180/// An async/await enabled Matrix client.
181///
182/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
183#[derive(Clone)]
184pub struct Client {
185    pub(crate) inner: Arc<ClientInner>,
186}
187
188#[derive(Default)]
189pub(crate) struct ClientLocks {
190    /// Lock ensuring that only a single room may be marked as a DM at once.
191    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
192    /// explanation.
193    pub(crate) mark_as_dm_lock: Mutex<()>,
194
195    /// Lock ensuring that only a single secret store is getting opened at the
196    /// same time.
197    ///
198    /// This is important so we don't accidentally create multiple different new
199    /// default secret storage keys.
200    #[cfg(feature = "e2e-encryption")]
201    pub(crate) open_secret_store_lock: Mutex<()>,
202
203    /// Lock ensuring that we're only storing a single secret at a time.
204    ///
205    /// Take a look at the [`SecretStore::put_secret`] method for a more
206    /// detailed explanation.
207    ///
208    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
209    #[cfg(feature = "e2e-encryption")]
210    pub(crate) store_secret_lock: Mutex<()>,
211
212    /// Lock ensuring that only one method at a time might modify our backup.
213    #[cfg(feature = "e2e-encryption")]
214    pub(crate) backup_modify_lock: Mutex<()>,
215
216    /// Lock ensuring that we're going to attempt to upload backups for a single
217    /// requester.
218    #[cfg(feature = "e2e-encryption")]
219    pub(crate) backup_upload_lock: Mutex<()>,
220
221    /// Handler making sure we only have one group session sharing request in
222    /// flight per room.
223    #[cfg(feature = "e2e-encryption")]
224    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
225
226    /// Lock making sure we're only doing one key claim request at a time.
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) key_claim_lock: Mutex<()>,
229
230    /// Handler to ensure that only one members request is running at a time,
231    /// given a room.
232    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
233
234    /// Handler to ensure that only one encryption state request is running at a
235    /// time, given a room.
236    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238    /// Deduplicating handler for sending read receipts. The string is an
239    /// internal implementation detail, see [`Self::send_single_receipt`].
240    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
241
242    #[cfg(feature = "e2e-encryption")]
243    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
244
245    /// Latest "generation" of data known by the crypto store.
246    ///
247    /// This is a counter that only increments, set in the database (and can
248    /// wrap). It's incremented whenever some process acquires a lock for the
249    /// first time. *This assumes the crypto store lock is being held, to
250    /// avoid data races on writing to this value in the store*.
251    ///
252    /// The current process will maintain this value in local memory and in the
253    /// DB over time. Observing a different value than the one read in
254    /// memory, when reading from the store indicates that somebody else has
255    /// written into the database under our feet.
256    ///
257    /// TODO: this should live in the `OlmMachine`, since it's information
258    /// related to the lock. As of today (2023-07-28), we blow up the entire
259    /// olm machine when there's a generation mismatch. So storing the
260    /// generation in the olm machine would make the client think there's
261    /// *always* a mismatch, and that's why we need to store the generation
262    /// outside the `OlmMachine`.
263    #[cfg(feature = "e2e-encryption")]
264    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
265}
266
267pub(crate) struct ClientInner {
268    /// All the data related to authentication and authorization.
269    pub(crate) auth_ctx: Arc<AuthCtx>,
270
271    /// The URL of the server.
272    ///
273    /// Not to be confused with the `Self::homeserver`. `server` is usually
274    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
275    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
276    /// homeserver (at the time of writing — 2024-08-28).
277    ///
278    /// This value is optional depending on how the `Client` has been built.
279    /// If it's been built from a homeserver URL directly, we don't know the
280    /// server. However, if the `Client` has been built from a server URL or
281    /// name, then the homeserver has been discovered, and we know both.
282    server: Option<Url>,
283
284    /// The URL of the homeserver to connect to.
285    ///
286    /// This is the URL for the client-server Matrix API.
287    homeserver: StdRwLock<Url>,
288
289    /// The sliding sync version.
290    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
291
292    /// The underlying HTTP client.
293    pub(crate) http_client: HttpClient,
294
295    /// User session data.
296    pub(super) base_client: BaseClient,
297
298    /// Collection of in-memory caches for the [`Client`].
299    pub(crate) caches: ClientCaches,
300
301    /// Collection of locks individual client methods might want to use, either
302    /// to ensure that only a single call to a method happens at once or to
303    /// deduplicate multiple calls to a method.
304    pub(crate) locks: ClientLocks,
305
306    /// The cross-process store locks holder name.
307    ///
308    /// The SDK provides cross-process store locks (see
309    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
310    /// `holder_name` is the value used for all cross-process store locks
311    /// used by this `Client`.
312    ///
313    /// If multiple `Client`s are running in different processes, this
314    /// value MUST be different for each `Client`.
315    cross_process_store_locks_holder_name: String,
316
317    /// A mapping of the times at which the current user sent typing notices,
318    /// keyed by room.
319    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
320
321    /// Event handlers. See `add_event_handler`.
322    pub(crate) event_handlers: EventHandlerStore,
323
324    /// Notification handlers. See `register_notification_handler`.
325    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
326
327    /// The sender-side of channels used to receive room updates.
328    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
329
330    /// The sender-side of a channel used to observe all the room updates of a
331    /// sync response.
332    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
333
334    /// Whether the client should update its homeserver URL with the discovery
335    /// information present in the login response.
336    respect_login_well_known: bool,
337
338    /// An event that can be listened on to wait for a successful sync. The
339    /// event will only be fired if a sync loop is running. Can be used for
340    /// synchronization, e.g. if we send out a request to create a room, we can
341    /// wait for the sync to get the data to fetch a room object from the state
342    /// store.
343    pub(crate) sync_beat: event_listener::Event,
344
345    /// A central cache for events, inactive first.
346    ///
347    /// It becomes active when [`EventCache::subscribe`] is called.
348    pub(crate) event_cache: OnceCell<EventCache>,
349
350    /// End-to-end encryption related state.
351    #[cfg(feature = "e2e-encryption")]
352    pub(crate) e2ee: EncryptionData,
353
354    /// The verification state of our own device.
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) verification_state: SharedObservable<VerificationState>,
357
358    /// Whether to enable the experimental support for sending and receiving
359    /// encrypted room history on invite, per [MSC4268].
360    ///
361    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
362    #[cfg(feature = "e2e-encryption")]
363    pub(crate) enable_share_history_on_invite: bool,
364
365    /// Data related to the [`SendQueue`].
366    ///
367    /// [`SendQueue`]: crate::send_queue::SendQueue
368    pub(crate) send_queue_data: Arc<SendQueueData>,
369
370    /// The `max_upload_size` value of the homeserver, it contains the max
371    /// request size you can send.
372    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
373
374    /// The entry point to get the [`LatestEvent`] of rooms and threads.
375    ///
376    /// [`LatestEvent`]: crate::latest_event::LatestEvent
377    latest_events: OnceCell<LatestEvents>,
378
379    /// Service handling the catching up of thread subscriptions in the
380    /// background.
381    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
382
383    #[cfg(feature = "experimental-search")]
384    /// Handler for [`RoomIndex`]'s of each room
385    search_index: SearchIndex,
386}
387
388impl ClientInner {
389    /// Create a new `ClientInner`.
390    ///
391    /// All the fields passed as parameters here are those that must be cloned
392    /// upon instantiation of a sub-client, e.g. a client specialized for
393    /// notifications.
394    #[allow(clippy::too_many_arguments)]
395    async fn new(
396        auth_ctx: Arc<AuthCtx>,
397        server: Option<Url>,
398        homeserver: Url,
399        sliding_sync_version: SlidingSyncVersion,
400        http_client: HttpClient,
401        base_client: BaseClient,
402        supported_versions: CachedValue<SupportedVersions>,
403        well_known: CachedValue<Option<WellKnownResponse>>,
404        respect_login_well_known: bool,
405        event_cache: OnceCell<EventCache>,
406        send_queue: Arc<SendQueueData>,
407        latest_events: OnceCell<LatestEvents>,
408        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
409        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
410        cross_process_store_locks_holder_name: String,
411        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
412        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
413    ) -> Arc<Self> {
414        let caches = ClientCaches {
415            supported_versions: supported_versions.into(),
416            well_known: well_known.into(),
417            server_metadata: Mutex::new(TtlCache::new()),
418        };
419
420        let client = Self {
421            server,
422            homeserver: StdRwLock::new(homeserver),
423            auth_ctx,
424            sliding_sync_version: StdRwLock::new(sliding_sync_version),
425            http_client,
426            base_client,
427            caches,
428            locks: Default::default(),
429            cross_process_store_locks_holder_name,
430            typing_notice_times: Default::default(),
431            event_handlers: Default::default(),
432            notification_handlers: Default::default(),
433            room_update_channels: Default::default(),
434            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
435            // ballast for all observers to catch up.
436            room_updates_sender: broadcast::Sender::new(32),
437            respect_login_well_known,
438            sync_beat: event_listener::Event::new(),
439            event_cache,
440            send_queue_data: send_queue,
441            latest_events,
442            #[cfg(feature = "e2e-encryption")]
443            e2ee: EncryptionData::new(encryption_settings),
444            #[cfg(feature = "e2e-encryption")]
445            verification_state: SharedObservable::new(VerificationState::Unknown),
446            #[cfg(feature = "e2e-encryption")]
447            enable_share_history_on_invite,
448            server_max_upload_size: Mutex::new(OnceCell::new()),
449            #[cfg(feature = "experimental-search")]
450            search_index: search_index_handler,
451            thread_subscription_catchup,
452        };
453
454        #[allow(clippy::let_and_return)]
455        let client = Arc::new(client);
456
457        #[cfg(feature = "e2e-encryption")]
458        client.e2ee.initialize_tasks(&client);
459
460        let _ = client
461            .event_cache
462            .get_or_init(|| async {
463                EventCache::new(
464                    WeakClient::from_inner(&client),
465                    client.base_client.event_cache_store().clone(),
466                )
467            })
468            .await;
469
470        let _ = client
471            .thread_subscription_catchup
472            .get_or_init(|| async {
473                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
474            })
475            .await;
476
477        client
478    }
479}
480
481#[cfg(not(tarpaulin_include))]
482impl Debug for Client {
483    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
484        write!(fmt, "Client")
485    }
486}
487
488impl Client {
489    /// Create a new [`Client`] that will use the given homeserver.
490    ///
491    /// # Arguments
492    ///
493    /// * `homeserver_url` - The homeserver that the client should connect to.
494    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
495        Self::builder().homeserver_url(homeserver_url).build().await
496    }
497
498    /// Returns a subscriber that publishes an event every time the ignore user
499    /// list changes.
500    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
501        self.inner.base_client.subscribe_to_ignore_user_list_changes()
502    }
503
504    /// Create a new [`ClientBuilder`].
505    pub fn builder() -> ClientBuilder {
506        ClientBuilder::new()
507    }
508
509    pub(crate) fn base_client(&self) -> &BaseClient {
510        &self.inner.base_client
511    }
512
513    /// The underlying HTTP client.
514    pub fn http_client(&self) -> &reqwest::Client {
515        &self.inner.http_client.inner
516    }
517
518    pub(crate) fn locks(&self) -> &ClientLocks {
519        &self.inner.locks
520    }
521
522    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
523        &self.inner.auth_ctx
524    }
525
526    /// The cross-process store locks holder name.
527    ///
528    /// The SDK provides cross-process store locks (see
529    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
530    /// `holder_name` is the value used for all cross-process store locks
531    /// used by this `Client`.
532    pub fn cross_process_store_locks_holder_name(&self) -> &str {
533        &self.inner.cross_process_store_locks_holder_name
534    }
535
536    /// Change the homeserver URL used by this client.
537    ///
538    /// # Arguments
539    ///
540    /// * `homeserver_url` - The new URL to use.
541    fn set_homeserver(&self, homeserver_url: Url) {
542        *self.inner.homeserver.write().unwrap() = homeserver_url;
543    }
544
545    /// Change to a different homeserver and re-resolve well-known.
546    #[cfg(feature = "e2e-encryption")]
547    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
548        &self,
549        homeserver_url: Url,
550    ) -> Result<()> {
551        self.set_homeserver(homeserver_url);
552        self.reset_well_known().await?;
553        if let Some(well_known) = self.load_or_fetch_well_known().await? {
554            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
555        }
556        Ok(())
557    }
558
559    /// Get the capabilities of the homeserver.
560    ///
561    /// This method should be used to check what features are supported by the
562    /// homeserver.
563    ///
564    /// # Examples
565    ///
566    /// ```no_run
567    /// # use matrix_sdk::Client;
568    /// # use url::Url;
569    /// # async {
570    /// # let homeserver = Url::parse("http://example.com")?;
571    /// let client = Client::new(homeserver).await?;
572    ///
573    /// let capabilities = client.get_capabilities().await?;
574    ///
575    /// if capabilities.change_password.enabled {
576    ///     // Change password
577    /// }
578    /// # anyhow::Ok(()) };
579    /// ```
580    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
581        let res = self.send(get_capabilities::v3::Request::new()).await?;
582        Ok(res.capabilities)
583    }
584
585    /// Get the server vendor information from the federation API.
586    ///
587    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
588    /// both the server's software name and version.
589    ///
590    /// # Examples
591    ///
592    /// ```no_run
593    /// # use matrix_sdk::Client;
594    /// # use url::Url;
595    /// # async {
596    /// # let homeserver = Url::parse("http://example.com")?;
597    /// let client = Client::new(homeserver).await?;
598    ///
599    /// let server_info = client.server_vendor_info(None).await?;
600    /// println!(
601    ///     "Server: {}, Version: {}",
602    ///     server_info.server_name, server_info.version
603    /// );
604    /// # anyhow::Ok(()) };
605    /// ```
606    #[cfg(feature = "federation-api")]
607    pub async fn server_vendor_info(
608        &self,
609        request_config: Option<RequestConfig>,
610    ) -> HttpResult<ServerVendorInfo> {
611        use ruma::api::federation::discovery::get_server_version;
612
613        let res = self
614            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
615            .await?;
616
617        // Extract server info, using defaults if fields are missing.
618        let server = res.server.unwrap_or_default();
619        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
620        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
621
622        Ok(ServerVendorInfo { server_name: server_name_str, version })
623    }
624
625    /// Get a copy of the default request config.
626    ///
627    /// The default request config is what's used when sending requests if no
628    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
629    /// function with such a parameter.
630    ///
631    /// If the default request config was not customized through
632    /// [`ClientBuilder`] when creating this `Client`, the returned value will
633    /// be equivalent to [`RequestConfig::default()`].
634    pub fn request_config(&self) -> RequestConfig {
635        self.inner.http_client.request_config
636    }
637
638    /// Check whether the client has been activated.
639    ///
640    /// A client is considered active when:
641    ///
642    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
643    ///    is logged in,
644    /// 2. Has loaded cached data from storage,
645    /// 3. If encryption is enabled, it also initialized or restored its
646    ///    `OlmMachine`.
647    pub fn is_active(&self) -> bool {
648        self.inner.base_client.is_active()
649    }
650
651    /// The server used by the client.
652    ///
653    /// See `Self::server` to learn more.
654    pub fn server(&self) -> Option<&Url> {
655        self.inner.server.as_ref()
656    }
657
658    /// The homeserver of the client.
659    pub fn homeserver(&self) -> Url {
660        self.inner.homeserver.read().unwrap().clone()
661    }
662
663    /// Get the sliding sync version.
664    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
665        self.inner.sliding_sync_version.read().unwrap().clone()
666    }
667
668    /// Override the sliding sync version.
669    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
670        let mut lock = self.inner.sliding_sync_version.write().unwrap();
671        *lock = version;
672    }
673
674    /// Get the Matrix user session meta information.
675    ///
676    /// If the client is currently logged in, this will return a
677    /// [`SessionMeta`] object which contains the user ID and device ID.
678    /// Otherwise it returns `None`.
679    pub fn session_meta(&self) -> Option<&SessionMeta> {
680        self.base_client().session_meta()
681    }
682
683    /// Returns a receiver that gets events for each room info update. To watch
684    /// for new events, use `receiver.resubscribe()`.
685    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
686        self.base_client().room_info_notable_update_receiver()
687    }
688
689    /// Performs a search for users.
690    /// The search is performed case-insensitively on user IDs and display names
691    ///
692    /// # Arguments
693    ///
694    /// * `search_term` - The search term for the search
695    /// * `limit` - The maximum number of results to return. Defaults to 10.
696    ///
697    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
698    pub async fn search_users(
699        &self,
700        search_term: &str,
701        limit: u64,
702    ) -> HttpResult<search_users::v3::Response> {
703        let mut request = search_users::v3::Request::new(search_term.to_owned());
704
705        if let Some(limit) = UInt::new(limit) {
706            request.limit = limit;
707        }
708
709        self.send(request).await
710    }
711
712    /// Get the user id of the current owner of the client.
713    pub fn user_id(&self) -> Option<&UserId> {
714        self.session_meta().map(|s| s.user_id.as_ref())
715    }
716
717    /// Get the device ID that identifies the current session.
718    pub fn device_id(&self) -> Option<&DeviceId> {
719        self.session_meta().map(|s| s.device_id.as_ref())
720    }
721
722    /// Get the current access token for this session.
723    ///
724    /// Will be `None` if the client has not been logged in.
725    pub fn access_token(&self) -> Option<String> {
726        self.auth_ctx().access_token()
727    }
728
729    /// Get the current tokens for this session.
730    ///
731    /// To be notified of changes in the session tokens, use
732    /// [`Client::subscribe_to_session_changes()`] or
733    /// [`Client::set_session_callbacks()`].
734    ///
735    /// Returns `None` if the client has not been logged in.
736    pub fn session_tokens(&self) -> Option<SessionTokens> {
737        self.auth_ctx().session_tokens()
738    }
739
740    /// Access the authentication API used to log in this client.
741    ///
742    /// Will be `None` if the client has not been logged in.
743    pub fn auth_api(&self) -> Option<AuthApi> {
744        match self.auth_ctx().auth_data.get()? {
745            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
746            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
747        }
748    }
749
750    /// Get the whole session info of this client.
751    ///
752    /// Will be `None` if the client has not been logged in.
753    ///
754    /// Can be used with [`Client::restore_session`] to restore a previously
755    /// logged-in session.
756    pub fn session(&self) -> Option<AuthSession> {
757        match self.auth_api()? {
758            AuthApi::Matrix(api) => api.session().map(Into::into),
759            AuthApi::OAuth(api) => api.full_session().map(Into::into),
760        }
761    }
762
763    /// Get a reference to the state store.
764    pub fn state_store(&self) -> &DynStateStore {
765        self.base_client().state_store()
766    }
767
768    /// Get a reference to the event cache store.
769    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
770        self.base_client().event_cache_store()
771    }
772
773    /// Get a reference to the media store.
774    pub fn media_store(&self) -> &MediaStoreLock {
775        self.base_client().media_store()
776    }
777
778    /// Access the native Matrix authentication API with this client.
779    pub fn matrix_auth(&self) -> MatrixAuth {
780        MatrixAuth::new(self.clone())
781    }
782
783    /// Get the account of the current owner of the client.
784    pub fn account(&self) -> Account {
785        Account::new(self.clone())
786    }
787
788    /// Get the encryption manager of the client.
789    #[cfg(feature = "e2e-encryption")]
790    pub fn encryption(&self) -> Encryption {
791        Encryption::new(self.clone())
792    }
793
794    /// Get the media manager of the client.
795    pub fn media(&self) -> Media {
796        Media::new(self.clone())
797    }
798
799    /// Get the pusher manager of the client.
800    pub fn pusher(&self) -> Pusher {
801        Pusher::new(self.clone())
802    }
803
804    /// Access the OAuth 2.0 API of the client.
805    pub fn oauth(&self) -> OAuth {
806        OAuth::new(self.clone())
807    }
808
809    /// Register a handler for a specific event type.
810    ///
811    /// The handler is a function or closure with one or more arguments. The
812    /// first argument is the event itself. All additional arguments are
813    /// "context" arguments: They have to implement [`EventHandlerContext`].
814    /// This trait is named that way because most of the types implementing it
815    /// give additional context about an event: The room it was in, its raw form
816    /// and other similar things. As two exceptions to this,
817    /// [`Client`] and [`EventHandlerHandle`] also implement the
818    /// `EventHandlerContext` trait so you don't have to clone your client
819    /// into the event handler manually and a handler can decide to remove
820    /// itself.
821    ///
822    /// Some context arguments are not universally applicable. A context
823    /// argument that isn't available for the given event type will result in
824    /// the event handler being skipped and an error being logged. The following
825    /// context argument types are only available for a subset of event types:
826    ///
827    /// * [`Room`] is only available for room-specific events, i.e. not for
828    ///   events like global account data events or presence events.
829    ///
830    /// You can provide custom context via
831    /// [`add_event_handler_context`](Client::add_event_handler_context) and
832    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
833    /// into the event handler.
834    ///
835    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
836    ///
837    /// # Examples
838    ///
839    /// ```no_run
840    /// use matrix_sdk::{
841    ///     deserialized_responses::EncryptionInfo,
842    ///     event_handler::Ctx,
843    ///     ruma::{
844    ///         events::{
845    ///             macros::EventContent,
846    ///             push_rules::PushRulesEvent,
847    ///             room::{
848    ///                 message::SyncRoomMessageEvent,
849    ///                 topic::SyncRoomTopicEvent,
850    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
851    ///             },
852    ///         },
853    ///         push::Action,
854    ///         Int, MilliSecondsSinceUnixEpoch,
855    ///     },
856    ///     Client, Room,
857    /// };
858    /// use serde::{Deserialize, Serialize};
859    ///
860    /// # async fn example(client: Client) {
861    /// client.add_event_handler(
862    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
863    ///         // Common usage: Room event plus room and client.
864    ///     },
865    /// );
866    /// client.add_event_handler(
867    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
868    ///         async move {
869    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
870    ///             // unencrypted events and events that were decrypted by the SDK.
871    ///         }
872    ///     },
873    /// );
874    /// client.add_event_handler(
875    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
876    ///         async move {
877    ///             // A `Vec<Action>` parameter allows you to know which push actions
878    ///             // are applicable for an event. For example, an event with
879    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
880    ///             // in the timeline.
881    ///         }
882    ///     },
883    /// );
884    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
885    ///     // You can omit any or all arguments after the first.
886    /// });
887    ///
888    /// // Registering a temporary event handler:
889    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
890    ///     /* Event handler */
891    /// });
892    /// client.remove_event_handler(handle);
893    ///
894    /// // Registering custom event handler context:
895    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
896    /// struct MyContext {
897    ///     number: usize,
898    /// }
899    /// client.add_event_handler_context(MyContext { number: 5 });
900    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
901    ///     // Use the context
902    /// });
903    ///
904    /// // This will handle membership events in joined rooms. Invites are special, see below.
905    /// client.add_event_handler(
906    ///     |ev: SyncRoomMemberEvent| async move {},
907    /// );
908    ///
909    /// // To handle state events in invited rooms (including invite membership events),
910    /// // `StrippedRoomMemberEvent` should be used.
911    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
912    /// client.add_event_handler(
913    ///     |ev: StrippedRoomMemberEvent| async move {},
914    /// );
915    ///
916    /// // Custom events work exactly the same way, you just need to declare
917    /// // the content struct and use the EventContent derive macro on it.
918    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
919    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
920    /// struct TokenEventContent {
921    ///     token: String,
922    ///     #[serde(rename = "exp")]
923    ///     expires_at: MilliSecondsSinceUnixEpoch,
924    /// }
925    ///
926    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
927    ///     todo!("Display the token");
928    /// });
929    ///
930    /// // Event handler closures can also capture local variables.
931    /// // Make sure they are cheap to clone though, because they will be cloned
932    /// // every time the closure is called.
933    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
934    ///
935    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
936    ///     println!("Calling the handler with identifier {data}");
937    /// });
938    /// # }
939    /// ```
940    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
941    where
942        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
943        H: EventHandler<Ev, Ctx>,
944    {
945        self.add_event_handler_impl(handler, None)
946    }
947
948    /// Register a handler for a specific room, and event type.
949    ///
950    /// This method works the same way as
951    /// [`add_event_handler`][Self::add_event_handler], except that the handler
952    /// will only be called for events in the room with the specified ID. See
953    /// that method for more details on event handler functions.
954    ///
955    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
956    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
957    /// your use case.
958    pub fn add_room_event_handler<Ev, Ctx, H>(
959        &self,
960        room_id: &RoomId,
961        handler: H,
962    ) -> EventHandlerHandle
963    where
964        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
965        H: EventHandler<Ev, Ctx>,
966    {
967        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
968    }
969
970    /// Observe a specific event type.
971    ///
972    /// `Ev` represents the kind of event that will be observed. `Ctx`
973    /// represents the context that will come with the event. It relies on the
974    /// same mechanism as [`Client::add_event_handler`]. The main difference is
975    /// that it returns an [`ObservableEventHandler`] and doesn't require a
976    /// user-defined closure. It is possible to subscribe to the
977    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
978    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
979    /// Ctx)`.
980    ///
981    /// Be careful that only the most recent value can be observed. Subscribers
982    /// are notified when a new value is sent, but there is no guarantee
983    /// that they will see all values.
984    ///
985    /// # Example
986    ///
987    /// Let's see a classical usage:
988    ///
989    /// ```
990    /// use futures_util::StreamExt as _;
991    /// use matrix_sdk::{
992    ///     Client, Room,
993    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
994    /// };
995    ///
996    /// # async fn example(client: Client) -> Option<()> {
997    /// let observer =
998    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
999    ///
1000    /// let mut subscriber = observer.subscribe();
1001    ///
1002    /// let (event, (room, push_actions)) = subscriber.next().await?;
1003    /// # Some(())
1004    /// # }
1005    /// ```
1006    ///
1007    /// Now let's see how to get several contexts that can be useful for you:
1008    ///
1009    /// ```
1010    /// use matrix_sdk::{
1011    ///     Client, Room,
1012    ///     deserialized_responses::EncryptionInfo,
1013    ///     ruma::{
1014    ///         events::room::{
1015    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1016    ///         },
1017    ///         push::Action,
1018    ///     },
1019    /// };
1020    ///
1021    /// # async fn example(client: Client) {
1022    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1023    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1024    ///
1025    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1026    /// // to distinguish between unencrypted events and events that were decrypted
1027    /// // by the SDK.
1028    /// let _ = client
1029    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1030    ///     );
1031    ///
1032    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1033    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1034    /// // should be highlighted in the timeline.
1035    /// let _ =
1036    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1037    ///
1038    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1039    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1040    /// # }
1041    /// ```
1042    ///
1043    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1044    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1045    where
1046        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1047        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1048    {
1049        self.observe_room_events_impl(None)
1050    }
1051
1052    /// Observe a specific room, and event type.
1053    ///
1054    /// This method works the same way as [`Client::observe_events`], except
1055    /// that the observability will only be applied for events in the room with
1056    /// the specified ID. See that method for more details.
1057    ///
1058    /// Be careful that only the most recent value can be observed. Subscribers
1059    /// are notified when a new value is sent, but there is no guarantee
1060    /// that they will see all values.
1061    pub fn observe_room_events<Ev, Ctx>(
1062        &self,
1063        room_id: &RoomId,
1064    ) -> ObservableEventHandler<(Ev, Ctx)>
1065    where
1066        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1067        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1068    {
1069        self.observe_room_events_impl(Some(room_id.to_owned()))
1070    }
1071
1072    /// Shared implementation for `Client::observe_events` and
1073    /// `Client::observe_room_events`.
1074    fn observe_room_events_impl<Ev, Ctx>(
1075        &self,
1076        room_id: Option<OwnedRoomId>,
1077    ) -> ObservableEventHandler<(Ev, Ctx)>
1078    where
1079        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1080        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1081    {
1082        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1083        // new value.
1084        let shared_observable = SharedObservable::new(None);
1085
1086        ObservableEventHandler::new(
1087            shared_observable.clone(),
1088            self.event_handler_drop_guard(self.add_event_handler_impl(
1089                move |event: Ev, context: Ctx| {
1090                    shared_observable.set(Some((event, context)));
1091
1092                    ready(())
1093                },
1094                room_id,
1095            )),
1096        )
1097    }
1098
1099    /// Remove the event handler associated with the handle.
1100    ///
1101    /// Note that you **must not** call `remove_event_handler` from the
1102    /// non-async part of an event handler, that is:
1103    ///
1104    /// ```ignore
1105    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1106    ///     // âš  this will cause a deadlock âš 
1107    ///     client.remove_event_handler(handle);
1108    ///
1109    ///     async move {
1110    ///         // removing the event handler here is fine
1111    ///         client.remove_event_handler(handle);
1112    ///     }
1113    /// })
1114    /// ```
1115    ///
1116    /// Note also that handlers that remove themselves will still execute with
1117    /// events received in the same sync cycle.
1118    ///
1119    /// # Arguments
1120    ///
1121    /// `handle` - The [`EventHandlerHandle`] that is returned when
1122    /// registering the event handler with [`Client::add_event_handler`].
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```no_run
1127    /// # use url::Url;
1128    /// # use tokio::sync::mpsc;
1129    /// #
1130    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1131    /// #
1132    /// use matrix_sdk::{
1133    ///     Client, event_handler::EventHandlerHandle,
1134    ///     ruma::events::room::member::SyncRoomMemberEvent,
1135    /// };
1136    /// #
1137    /// # futures_executor::block_on(async {
1138    /// # let client = matrix_sdk::Client::builder()
1139    /// #     .homeserver_url(homeserver)
1140    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1141    /// #     .build()
1142    /// #     .await
1143    /// #     .unwrap();
1144    ///
1145    /// client.add_event_handler(
1146    ///     |ev: SyncRoomMemberEvent,
1147    ///      client: Client,
1148    ///      handle: EventHandlerHandle| async move {
1149    ///         // Common usage: Check arriving Event is the expected one
1150    ///         println!("Expected RoomMemberEvent received!");
1151    ///         client.remove_event_handler(handle);
1152    ///     },
1153    /// );
1154    /// # });
1155    /// ```
1156    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1157        self.inner.event_handlers.remove(handle);
1158    }
1159
1160    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1161    /// the given handle.
1162    ///
1163    /// When the returned value is dropped, the event handler will be removed.
1164    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1165        EventHandlerDropGuard::new(handle, self.clone())
1166    }
1167
1168    /// Add an arbitrary value for use as event handler context.
1169    ///
1170    /// The value can be obtained in an event handler by adding an argument of
1171    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1172    ///
1173    /// If a value of the same type has been added before, it will be
1174    /// overwritten.
1175    ///
1176    /// # Examples
1177    ///
1178    /// ```no_run
1179    /// use matrix_sdk::{
1180    ///     Room, event_handler::Ctx,
1181    ///     ruma::events::room::message::SyncRoomMessageEvent,
1182    /// };
1183    /// # #[derive(Clone)]
1184    /// # struct SomeType;
1185    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1186    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1187    /// # futures_executor::block_on(async {
1188    /// # let client = matrix_sdk::Client::builder()
1189    /// #     .homeserver_url(homeserver)
1190    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1191    /// #     .build()
1192    /// #     .await
1193    /// #     .unwrap();
1194    ///
1195    /// // Handle used to send messages to the UI part of the app
1196    /// let my_gui_handle: SomeType = obtain_gui_handle();
1197    ///
1198    /// client.add_event_handler_context(my_gui_handle.clone());
1199    /// client.add_event_handler(
1200    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1201    ///         async move {
1202    ///             // gui_handle.send(DisplayMessage { message: ev });
1203    ///         }
1204    ///     },
1205    /// );
1206    /// # });
1207    /// ```
1208    pub fn add_event_handler_context<T>(&self, ctx: T)
1209    where
1210        T: Clone + Send + Sync + 'static,
1211    {
1212        self.inner.event_handlers.add_context(ctx);
1213    }
1214
1215    /// Register a handler for a notification.
1216    ///
1217    /// Similar to [`Client::add_event_handler`], but only allows functions
1218    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1219    /// [`Client`] for now.
1220    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1221    where
1222        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1223        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1224    {
1225        self.inner.notification_handlers.write().await.push(Box::new(
1226            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1227        ));
1228
1229        self
1230    }
1231
1232    /// Subscribe to all updates for the room with the given ID.
1233    ///
1234    /// The returned receiver will receive a new message for each sync response
1235    /// that contains updates for that room.
1236    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1237        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1238            btree_map::Entry::Vacant(entry) => {
1239                let (tx, rx) = broadcast::channel(8);
1240                entry.insert(tx);
1241                rx
1242            }
1243            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1244        }
1245    }
1246
1247    /// Subscribe to all updates to all rooms, whenever any has been received in
1248    /// a sync response.
1249    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1250        self.inner.room_updates_sender.subscribe()
1251    }
1252
1253    pub(crate) async fn notification_handlers(
1254        &self,
1255    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1256        self.inner.notification_handlers.read().await
1257    }
1258
1259    /// Get all the rooms the client knows about.
1260    ///
1261    /// This will return the list of joined, invited, and left rooms.
1262    pub fn rooms(&self) -> Vec<Room> {
1263        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1264    }
1265
1266    /// Get all the rooms the client knows about, filtered by room state.
1267    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1268        self.base_client()
1269            .rooms_filtered(filter)
1270            .into_iter()
1271            .map(|room| Room::new(self.clone(), room))
1272            .collect()
1273    }
1274
1275    /// Get a stream of all the rooms, in addition to the existing rooms.
1276    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1277        let (rooms, stream) = self.base_client().rooms_stream();
1278
1279        let map_room = |room| Room::new(self.clone(), room);
1280
1281        (
1282            rooms.into_iter().map(map_room).collect(),
1283            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1284        )
1285    }
1286
1287    /// Returns the joined rooms this client knows about.
1288    pub fn joined_rooms(&self) -> Vec<Room> {
1289        self.rooms_filtered(RoomStateFilter::JOINED)
1290    }
1291
1292    /// Returns the invited rooms this client knows about.
1293    pub fn invited_rooms(&self) -> Vec<Room> {
1294        self.rooms_filtered(RoomStateFilter::INVITED)
1295    }
1296
1297    /// Returns the left rooms this client knows about.
1298    pub fn left_rooms(&self) -> Vec<Room> {
1299        self.rooms_filtered(RoomStateFilter::LEFT)
1300    }
1301
1302    /// Returns the joined space rooms this client knows about.
1303    pub fn joined_space_rooms(&self) -> Vec<Room> {
1304        self.base_client()
1305            .rooms_filtered(RoomStateFilter::JOINED)
1306            .into_iter()
1307            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1308            .collect()
1309    }
1310
1311    /// Get a room with the given room id.
1312    ///
1313    /// # Arguments
1314    ///
1315    /// `room_id` - The unique id of the room that should be fetched.
1316    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1317        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1318    }
1319
1320    /// Gets the preview of a room, whether the current user has joined it or
1321    /// not.
1322    pub async fn get_room_preview(
1323        &self,
1324        room_or_alias_id: &RoomOrAliasId,
1325        via: Vec<OwnedServerName>,
1326    ) -> Result<RoomPreview> {
1327        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1328            Ok(room_id) => room_id.to_owned(),
1329            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1330        };
1331
1332        if let Some(room) = self.get_room(&room_id) {
1333            // The cached data can only be trusted if the room state is joined or
1334            // banned: for invite and knock rooms, no updates will be received
1335            // for the rooms after the invite/knock action took place so we may
1336            // have very out to date data for important fields such as
1337            // `join_rule`. For left rooms, the homeserver should return the latest info.
1338            match room.state() {
1339                RoomState::Joined | RoomState::Banned => {
1340                    return Ok(RoomPreview::from_known_room(&room).await);
1341                }
1342                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1343            }
1344        }
1345
1346        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1347    }
1348
1349    /// Resolve a room alias to a room id and a list of servers which know
1350    /// about it.
1351    ///
1352    /// # Arguments
1353    ///
1354    /// `room_alias` - The room alias to be resolved.
1355    pub async fn resolve_room_alias(
1356        &self,
1357        room_alias: &RoomAliasId,
1358    ) -> HttpResult<get_alias::v3::Response> {
1359        let request = get_alias::v3::Request::new(room_alias.to_owned());
1360        self.send(request).await
1361    }
1362
1363    /// Checks if a room alias is not in use yet.
1364    ///
1365    /// Returns:
1366    /// - `Ok(true)` if the room alias is available.
1367    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1368    ///   status code).
1369    /// - An `Err` otherwise.
1370    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1371        match self.resolve_room_alias(alias).await {
1372            // The room alias was resolved, so it's already in use.
1373            Ok(_) => Ok(false),
1374            Err(error) => {
1375                match error.client_api_error_kind() {
1376                    // The room alias wasn't found, so it's available.
1377                    Some(ErrorKind::NotFound) => Ok(true),
1378                    _ => Err(error),
1379                }
1380            }
1381        }
1382    }
1383
1384    /// Adds a new room alias associated with a room to the room directory.
1385    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1386        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1387        self.send(request).await?;
1388        Ok(())
1389    }
1390
1391    /// Removes a room alias from the room directory.
1392    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1393        let request = delete_alias::v3::Request::new(alias.to_owned());
1394        self.send(request).await?;
1395        Ok(())
1396    }
1397
1398    /// Update the homeserver from the login response well-known if needed.
1399    ///
1400    /// # Arguments
1401    ///
1402    /// * `login_well_known` - The `well_known` field from a successful login
1403    ///   response.
1404    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1405        if self.inner.respect_login_well_known
1406            && let Some(well_known) = login_well_known
1407            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1408        {
1409            self.set_homeserver(homeserver);
1410        }
1411    }
1412
1413    /// Similar to [`Client::restore_session_with`], with
1414    /// [`RoomLoadSettings::default()`].
1415    ///
1416    /// # Panics
1417    ///
1418    /// Panics if a session was already restored or logged in.
1419    #[instrument(skip_all)]
1420    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1421        self.restore_session_with(session, RoomLoadSettings::default()).await
1422    }
1423
1424    /// Restore a session previously logged-in using one of the available
1425    /// authentication APIs. The number of rooms to restore is controlled by
1426    /// [`RoomLoadSettings`].
1427    ///
1428    /// See the documentation of the corresponding authentication API's
1429    /// `restore_session` method for more information.
1430    ///
1431    /// # Panics
1432    ///
1433    /// Panics if a session was already restored or logged in.
1434    #[instrument(skip_all)]
1435    pub async fn restore_session_with(
1436        &self,
1437        session: impl Into<AuthSession>,
1438        room_load_settings: RoomLoadSettings,
1439    ) -> Result<()> {
1440        let session = session.into();
1441        match session {
1442            AuthSession::Matrix(session) => {
1443                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1444            }
1445            AuthSession::OAuth(session) => {
1446                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1447            }
1448        }
1449    }
1450
1451    /// Refresh the access token using the authentication API used to log into
1452    /// this session.
1453    ///
1454    /// See the documentation of the authentication API's `refresh_access_token`
1455    /// method for more information.
1456    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1457        let Some(auth_api) = self.auth_api() else {
1458            return Err(RefreshTokenError::RefreshTokenRequired);
1459        };
1460
1461        match auth_api {
1462            AuthApi::Matrix(api) => {
1463                trace!("Token refresh: Using the homeserver.");
1464                Box::pin(api.refresh_access_token()).await?;
1465            }
1466            AuthApi::OAuth(api) => {
1467                trace!("Token refresh: Using OAuth 2.0.");
1468                Box::pin(api.refresh_access_token()).await?;
1469            }
1470        }
1471
1472        Ok(())
1473    }
1474
1475    /// Log out the current session using the proper authentication API.
1476    ///
1477    /// # Errors
1478    ///
1479    /// Returns an error if the session is not authenticated or if an error
1480    /// occurred while making the request to the server.
1481    pub async fn logout(&self) -> Result<(), Error> {
1482        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1483        match auth_api {
1484            AuthApi::Matrix(matrix_auth) => {
1485                matrix_auth.logout().await?;
1486                Ok(())
1487            }
1488            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1489        }
1490    }
1491
1492    /// Get or upload a sync filter.
1493    ///
1494    /// This method will either get a filter ID from the store or upload the
1495    /// filter definition to the homeserver and return the new filter ID.
1496    ///
1497    /// # Arguments
1498    ///
1499    /// * `filter_name` - The unique name of the filter, this name will be used
1500    /// locally to store and identify the filter ID returned by the server.
1501    ///
1502    /// * `definition` - The filter definition that should be uploaded to the
1503    /// server if no filter ID can be found in the store.
1504    ///
1505    /// # Examples
1506    ///
1507    /// ```no_run
1508    /// # use matrix_sdk::{
1509    /// #    Client, config::SyncSettings,
1510    /// #    ruma::api::client::{
1511    /// #        filter::{
1512    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1513    /// #        },
1514    /// #        sync::sync_events::v3::Filter,
1515    /// #    }
1516    /// # };
1517    /// # use url::Url;
1518    /// # async {
1519    /// # let homeserver = Url::parse("http://example.com").unwrap();
1520    /// # let client = Client::new(homeserver).await.unwrap();
1521    /// let mut filter = FilterDefinition::default();
1522    ///
1523    /// // Let's enable member lazy loading.
1524    /// filter.room.state.lazy_load_options =
1525    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1526    ///
1527    /// let filter_id = client
1528    ///     .get_or_upload_filter("sync", filter)
1529    ///     .await
1530    ///     .unwrap();
1531    ///
1532    /// let sync_settings = SyncSettings::new()
1533    ///     .filter(Filter::FilterId(filter_id));
1534    ///
1535    /// let response = client.sync_once(sync_settings).await.unwrap();
1536    /// # };
1537    #[instrument(skip(self, definition))]
1538    pub async fn get_or_upload_filter(
1539        &self,
1540        filter_name: &str,
1541        definition: FilterDefinition,
1542    ) -> Result<String> {
1543        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1544            debug!("Found filter locally");
1545            Ok(filter)
1546        } else {
1547            debug!("Didn't find filter locally");
1548            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1549            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1550            let response = self.send(request).await?;
1551
1552            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1553
1554            Ok(response.filter_id)
1555        }
1556    }
1557
1558    /// Prepare to join a room by ID, by getting the current details about it
1559    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1560        let room = self.get_room(room_id)?;
1561
1562        let inviter = match room.invite_details().await {
1563            Ok(details) => details.inviter,
1564            Err(Error::WrongRoomState(_)) => None,
1565            Err(e) => {
1566                warn!("Error fetching invite details for room: {e:?}");
1567                None
1568            }
1569        };
1570
1571        Some(PreJoinRoomInfo { inviter })
1572    }
1573
1574    /// Finish joining a room.
1575    ///
1576    /// If the room was an invite that should be marked as a DM, will include it
1577    /// in the DM event after creating the joined room.
1578    ///
1579    /// If encrypted history sharing is enabled, will check to see if we have a
1580    /// key bundle, and import it if so.
1581    ///
1582    /// # Arguments
1583    ///
1584    /// * `room_id` - The `RoomId` of the room that was joined.
1585    /// * `pre_join_room_info` - Information about the room before we joined.
1586    async fn finish_join_room(
1587        &self,
1588        room_id: &RoomId,
1589        pre_join_room_info: Option<PreJoinRoomInfo>,
1590    ) -> Result<Room> {
1591        info!(?room_id, ?pre_join_room_info, "Completing room join");
1592        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1593            room.state() == RoomState::Invited
1594                && room.is_direct().await.unwrap_or_else(|e| {
1595                    warn!(%room_id, "is_direct() failed: {e}");
1596                    false
1597                })
1598        } else {
1599            false
1600        };
1601
1602        let base_room = self
1603            .base_client()
1604            .room_joined(
1605                room_id,
1606                pre_join_room_info
1607                    .as_ref()
1608                    .and_then(|info| info.inviter.as_ref())
1609                    .map(|i| i.user_id().to_owned()),
1610            )
1611            .await?;
1612        let room = Room::new(self.clone(), base_room);
1613
1614        if mark_as_dm {
1615            room.set_is_direct(true).await?;
1616        }
1617
1618        #[cfg(feature = "e2e-encryption")]
1619        if self.inner.enable_share_history_on_invite
1620            && let Some(inviter) =
1621                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1622        {
1623            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1624                .await?;
1625        }
1626
1627        // Suppress "unused variable" and "unused field" lints
1628        #[cfg(not(feature = "e2e-encryption"))]
1629        let _ = pre_join_room_info.map(|i| i.inviter);
1630
1631        Ok(room)
1632    }
1633
1634    /// Join a room by `RoomId`.
1635    ///
1636    /// Returns the `Room` in the joined state.
1637    ///
1638    /// # Arguments
1639    ///
1640    /// * `room_id` - The `RoomId` of the room to be joined.
1641    #[instrument(skip(self))]
1642    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1643        // See who invited us to this room, if anyone. Note we have to do this before
1644        // making the `/join` request, otherwise we could race against the sync.
1645        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1646
1647        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1648        let response = self.send(request).await?;
1649        self.finish_join_room(&response.room_id, pre_join_info).await
1650    }
1651
1652    /// Join a room by `RoomOrAliasId`.
1653    ///
1654    /// Returns the `Room` in the joined state.
1655    ///
1656    /// # Arguments
1657    ///
1658    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1659    ///   alias looks like `#name:example.com`.
1660    /// * `server_names` - The server names to be used for resolving the alias,
1661    ///   if needs be.
1662    #[instrument(skip(self))]
1663    pub async fn join_room_by_id_or_alias(
1664        &self,
1665        alias: &RoomOrAliasId,
1666        server_names: &[OwnedServerName],
1667    ) -> Result<Room> {
1668        let pre_join_info = {
1669            match alias.try_into() {
1670                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1671                Err(_) => {
1672                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1673                    // responding to an invitation to the room, and therefore don't need to handle
1674                    // things that happen as a result of invites.
1675                    None
1676                }
1677            }
1678        };
1679        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1680            via: server_names.to_owned(),
1681        });
1682        let response = self.send(request).await?;
1683        self.finish_join_room(&response.room_id, pre_join_info).await
1684    }
1685
1686    /// Search the homeserver's directory of public rooms.
1687    ///
1688    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1689    /// a `get_public_rooms::Response`.
1690    ///
1691    /// # Arguments
1692    ///
1693    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1694    ///
1695    /// * `since` - Pagination token from a previous request.
1696    ///
1697    /// * `server` - The name of the server, if `None` the requested server is
1698    ///   used.
1699    ///
1700    /// # Examples
1701    /// ```no_run
1702    /// use matrix_sdk::Client;
1703    /// # use url::Url;
1704    /// # let homeserver = Url::parse("http://example.com").unwrap();
1705    /// # let limit = Some(10);
1706    /// # let since = Some("since token");
1707    /// # let server = Some("servername.com".try_into().unwrap());
1708    /// # async {
1709    /// let mut client = Client::new(homeserver).await.unwrap();
1710    ///
1711    /// client.public_rooms(limit, since, server).await;
1712    /// # };
1713    /// ```
1714    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1715    pub async fn public_rooms(
1716        &self,
1717        limit: Option<u32>,
1718        since: Option<&str>,
1719        server: Option<&ServerName>,
1720    ) -> HttpResult<get_public_rooms::v3::Response> {
1721        let limit = limit.map(UInt::from);
1722
1723        let request = assign!(get_public_rooms::v3::Request::new(), {
1724            limit,
1725            since: since.map(ToOwned::to_owned),
1726            server: server.map(ToOwned::to_owned),
1727        });
1728        self.send(request).await
1729    }
1730
1731    /// Create a room with the given parameters.
1732    ///
1733    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1734    /// created room.
1735    ///
1736    /// If you want to create a direct message with one specific user, you can
1737    /// use [`create_dm`][Self::create_dm], which is more convenient than
1738    /// assembling the [`create_room::v3::Request`] yourself.
1739    ///
1740    /// If the `is_direct` field of the request is set to `true` and at least
1741    /// one user is invited, the room will be automatically added to the direct
1742    /// rooms in the account data.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```no_run
1747    /// use matrix_sdk::{
1748    ///     Client,
1749    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1750    /// };
1751    /// # use url::Url;
1752    /// #
1753    /// # async {
1754    /// # let homeserver = Url::parse("http://example.com").unwrap();
1755    /// let request = CreateRoomRequest::new();
1756    /// let client = Client::new(homeserver).await.unwrap();
1757    /// assert!(client.create_room(request).await.is_ok());
1758    /// # };
1759    /// ```
1760    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1761        let invite = request.invite.clone();
1762        let is_direct_room = request.is_direct;
1763        let response = self.send(request).await?;
1764
1765        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1766
1767        let joined_room = Room::new(self.clone(), base_room);
1768
1769        if is_direct_room
1770            && !invite.is_empty()
1771            && let Err(error) =
1772                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1773        {
1774            // FIXME: Retry in the background
1775            error!("Failed to mark room as DM: {error}");
1776        }
1777
1778        Ok(joined_room)
1779    }
1780
1781    /// Create a DM room.
1782    ///
1783    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1784    /// given user being invited, the room marked `is_direct` and both the
1785    /// creator and invitee getting the default maximum power level.
1786    ///
1787    /// If the `e2e-encryption` feature is enabled, the room will also be
1788    /// encrypted.
1789    ///
1790    /// # Arguments
1791    ///
1792    /// * `user_id` - The ID of the user to create a DM for.
1793    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1794        #[cfg(feature = "e2e-encryption")]
1795        let initial_state = vec![
1796            InitialStateEvent::with_empty_state_key(
1797                RoomEncryptionEventContent::with_recommended_defaults(),
1798            )
1799            .to_raw_any(),
1800        ];
1801
1802        #[cfg(not(feature = "e2e-encryption"))]
1803        let initial_state = vec![];
1804
1805        let request = assign!(create_room::v3::Request::new(), {
1806            invite: vec![user_id.to_owned()],
1807            is_direct: true,
1808            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1809            initial_state,
1810        });
1811
1812        self.create_room(request).await
1813    }
1814
1815    /// Get the existing DM room with the given user, if any.
1816    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1817        let rooms = self.joined_rooms();
1818
1819        // Find the room we share with the `user_id` and only with `user_id`
1820        let room = rooms.into_iter().find(|r| {
1821            let targets = r.direct_targets();
1822            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1823        });
1824
1825        trace!(?user_id, ?room, "Found DM room with user");
1826        room
1827    }
1828
1829    /// Search the homeserver's directory for public rooms with a filter.
1830    ///
1831    /// # Arguments
1832    ///
1833    /// * `room_search` - The easiest way to create this request is using the
1834    ///   `get_public_rooms_filtered::Request` itself.
1835    ///
1836    /// # Examples
1837    ///
1838    /// ```no_run
1839    /// # use url::Url;
1840    /// # use matrix_sdk::Client;
1841    /// # async {
1842    /// # let homeserver = Url::parse("http://example.com")?;
1843    /// use matrix_sdk::ruma::{
1844    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1845    /// };
1846    /// # let mut client = Client::new(homeserver).await?;
1847    ///
1848    /// let mut filter = Filter::new();
1849    /// filter.generic_search_term = Some("rust".to_owned());
1850    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1851    /// request.filter = filter;
1852    ///
1853    /// let response = client.public_rooms_filtered(request).await?;
1854    ///
1855    /// for room in response.chunk {
1856    ///     println!("Found room {room:?}");
1857    /// }
1858    /// # anyhow::Ok(()) };
1859    /// ```
1860    pub async fn public_rooms_filtered(
1861        &self,
1862        request: get_public_rooms_filtered::v3::Request,
1863    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1864        self.send(request).await
1865    }
1866
1867    /// Send an arbitrary request to the server, without updating client state.
1868    ///
1869    /// **Warning:** Because this method *does not* update the client state, it
1870    /// is important to make sure that you account for this yourself, and
1871    /// use wrapper methods where available.  This method should *only* be
1872    /// used if a wrapper method for the endpoint you'd like to use is not
1873    /// available.
1874    ///
1875    /// # Arguments
1876    ///
1877    /// * `request` - A filled out and valid request for the endpoint to be hit
1878    ///
1879    /// * `timeout` - An optional request timeout setting, this overrides the
1880    ///   default request setting if one was set.
1881    ///
1882    /// # Examples
1883    ///
1884    /// ```no_run
1885    /// # use matrix_sdk::{Client, config::SyncSettings};
1886    /// # use url::Url;
1887    /// # async {
1888    /// # let homeserver = Url::parse("http://localhost:8080")?;
1889    /// # let mut client = Client::new(homeserver).await?;
1890    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1891    ///
1892    /// // First construct the request you want to make
1893    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1894    /// // for all available Endpoints
1895    /// let user_id = user_id!("@example:localhost").to_owned();
1896    /// let request = profile::get_profile::v3::Request::new(user_id);
1897    ///
1898    /// // Start the request using Client::send()
1899    /// let response = client.send(request).await?;
1900    ///
1901    /// // Check the corresponding Response struct to find out what types are
1902    /// // returned
1903    /// # anyhow::Ok(()) };
1904    /// ```
1905    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1906    where
1907        Request: OutgoingRequest + Clone + Debug,
1908        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1909        Request::PathBuilder: SupportedPathBuilder,
1910        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1911        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1912    {
1913        SendRequest {
1914            client: self.clone(),
1915            request,
1916            config: None,
1917            send_progress: Default::default(),
1918        }
1919    }
1920
1921    pub(crate) async fn send_inner<Request>(
1922        &self,
1923        request: Request,
1924        config: Option<RequestConfig>,
1925        send_progress: SharedObservable<TransmissionProgress>,
1926    ) -> HttpResult<Request::IncomingResponse>
1927    where
1928        Request: OutgoingRequest + Debug,
1929        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1930        Request::PathBuilder: SupportedPathBuilder,
1931        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1932        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1933    {
1934        let homeserver = self.homeserver().to_string();
1935        let access_token = self.access_token();
1936        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1937
1938        let path_builder_input =
1939            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1940
1941        let result = self
1942            .inner
1943            .http_client
1944            .send(
1945                request,
1946                config,
1947                homeserver,
1948                access_token.as_deref(),
1949                path_builder_input,
1950                send_progress,
1951            )
1952            .await;
1953
1954        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1955            result.as_ref().map_err(HttpError::client_api_error_kind)
1956            && let Some(access_token) = &access_token
1957        {
1958            // Mark the access token as expired.
1959            self.auth_ctx().set_access_token_expired(access_token);
1960        }
1961
1962        result
1963    }
1964
1965    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1966        _ = self
1967            .inner
1968            .auth_ctx
1969            .session_change_sender
1970            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1971    }
1972
1973    /// Fetches server versions from network; no caching.
1974    pub async fn fetch_server_versions(
1975        &self,
1976        request_config: Option<RequestConfig>,
1977    ) -> HttpResult<get_supported_versions::Response> {
1978        // Since this was called by the user, try to refresh the access token if
1979        // necessary.
1980        self.fetch_server_versions_inner(false, request_config).await
1981    }
1982
1983    /// Fetches server versions from network; no caching.
1984    ///
1985    /// If the access token is expired and `failsafe` is `false`, this will
1986    /// attempt to refresh the access token, otherwise this will try to make an
1987    /// unauthenticated request instead.
1988    pub(crate) async fn fetch_server_versions_inner(
1989        &self,
1990        failsafe: bool,
1991        request_config: Option<RequestConfig>,
1992    ) -> HttpResult<get_supported_versions::Response> {
1993        if !failsafe {
1994            // `Client::send()` handles refreshing access tokens.
1995            return self
1996                .send(get_supported_versions::Request::new())
1997                .with_request_config(request_config)
1998                .await;
1999        }
2000
2001        let homeserver = self.homeserver().to_string();
2002
2003        // If we have a fresh access token, try with it first.
2004        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2005            && self.auth_ctx().has_valid_access_token()
2006            && let Some(access_token) = self.access_token()
2007        {
2008            let result = self
2009                .inner
2010                .http_client
2011                .send(
2012                    get_supported_versions::Request::new(),
2013                    request_config,
2014                    homeserver.clone(),
2015                    Some(&access_token),
2016                    (),
2017                    Default::default(),
2018                )
2019                .await;
2020
2021            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2022                result.as_ref().map_err(HttpError::client_api_error_kind)
2023            {
2024                // If the access token is actually expired, mark it as expired and fallback to
2025                // the unauthenticated request below.
2026                self.auth_ctx().set_access_token_expired(&access_token);
2027            } else {
2028                // If the request succeeded or it's an other error, just stop now.
2029                return result;
2030            }
2031        }
2032
2033        // Try without authentication.
2034        self.inner
2035            .http_client
2036            .send(
2037                get_supported_versions::Request::new(),
2038                request_config,
2039                homeserver.clone(),
2040                None,
2041                (),
2042                Default::default(),
2043            )
2044            .await
2045    }
2046
2047    /// Fetches client well_known from network; no caching.
2048    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2049        let server_url_string = self
2050            .server()
2051            .unwrap_or(
2052                // Sometimes people configure their well-known directly on the homeserver so use
2053                // this as a fallback when the server name is unknown.
2054                &self.homeserver(),
2055            )
2056            .to_string();
2057
2058        let well_known = self
2059            .inner
2060            .http_client
2061            .send(
2062                discover_homeserver::Request::new(),
2063                Some(RequestConfig::short_retry()),
2064                server_url_string,
2065                None,
2066                (),
2067                Default::default(),
2068            )
2069            .await;
2070
2071        match well_known {
2072            Ok(well_known) => Some(well_known),
2073            Err(http_error) => {
2074                // It is perfectly valid to not have a well-known file.
2075                // Maybe we should check for a specific error code to be sure?
2076                warn!("Failed to fetch client well-known: {http_error}");
2077                None
2078            }
2079        }
2080    }
2081
2082    /// Load supported versions from storage, or fetch them from network and
2083    /// cache them.
2084    ///
2085    /// If `failsafe` is true, this will try to minimize side effects to avoid
2086    /// possible deadlocks.
2087    async fn load_or_fetch_supported_versions(
2088        &self,
2089        failsafe: bool,
2090    ) -> HttpResult<SupportedVersionsResponse> {
2091        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2092            Ok(Some(stored)) => {
2093                if let Some(supported_versions) =
2094                    stored.into_supported_versions().and_then(|value| value.into_data())
2095                {
2096                    return Ok(supported_versions);
2097                }
2098            }
2099            Ok(None) => {
2100                // fallthrough: cache is empty
2101            }
2102            Err(err) => {
2103                warn!("error when loading cached supported versions: {err}");
2104                // fallthrough to network.
2105            }
2106        }
2107
2108        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2109        let supported_versions = SupportedVersionsResponse {
2110            versions: server_versions.versions,
2111            unstable_features: server_versions.unstable_features,
2112        };
2113
2114        // Only attempt to cache the result in storage if the request was authenticated.
2115        if self.auth_ctx().has_valid_access_token()
2116            && let Err(err) = self
2117                .state_store()
2118                .set_kv_data(
2119                    StateStoreDataKey::SupportedVersions,
2120                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2121                        supported_versions.clone(),
2122                    )),
2123                )
2124                .await
2125        {
2126            warn!("error when caching supported versions: {err}");
2127        }
2128
2129        Ok(supported_versions)
2130    }
2131
2132    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2133        let supported_versions = &self.inner.caches.supported_versions;
2134
2135        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2136            Some(val.clone())
2137        } else {
2138            None
2139        }
2140    }
2141
2142    /// Get the Matrix versions and features supported by the homeserver by
2143    /// fetching them from the server or the cache.
2144    ///
2145    /// This is equivalent to calling both [`Client::server_versions()`] and
2146    /// [`Client::unstable_features()`]. To always fetch the result from the
2147    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2148    /// and then `.as_supported_versions()` on the response.
2149    ///
2150    /// # Examples
2151    ///
2152    /// ```no_run
2153    /// use ruma::api::{FeatureFlag, MatrixVersion};
2154    /// # use matrix_sdk::{Client, config::SyncSettings};
2155    /// # use url::Url;
2156    /// # async {
2157    /// # let homeserver = Url::parse("http://localhost:8080")?;
2158    /// # let mut client = Client::new(homeserver).await?;
2159    ///
2160    /// let supported = client.supported_versions().await?;
2161    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2162    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2163    ///
2164    /// let msc_x_feature = FeatureFlag::from("msc_x");
2165    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2166    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2167    /// # anyhow::Ok(()) };
2168    /// ```
2169    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2170        self.supported_versions_inner(false).await
2171    }
2172
2173    /// Get the Matrix versions and features supported by the homeserver by
2174    /// fetching them from the server or the cache.
2175    ///
2176    /// If `failsafe` is true, this will try to minimize side effects to avoid
2177    /// possible deadlocks.
2178    pub(crate) async fn supported_versions_inner(
2179        &self,
2180        failsafe: bool,
2181    ) -> HttpResult<SupportedVersions> {
2182        let cached_supported_versions = &self.inner.caches.supported_versions;
2183        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2184            return Ok(val.clone());
2185        }
2186
2187        let mut guarded_supported_versions = cached_supported_versions.write().await;
2188        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2189            return Ok(val.clone());
2190        }
2191
2192        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2193
2194        // Fill both unstable features and server versions at once.
2195        let mut supported_versions = supported.supported_versions();
2196        if supported_versions.versions.is_empty() {
2197            supported_versions.versions = [MatrixVersion::V1_0].into();
2198        }
2199
2200        // Only cache the result if the request was authenticated.
2201        if self.auth_ctx().has_valid_access_token() {
2202            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2203        }
2204
2205        Ok(supported_versions)
2206    }
2207
2208    /// Get the Matrix versions and features supported by the homeserver by
2209    /// fetching them from the cache.
2210    ///
2211    /// For a version of this function that fetches the supported versions and
2212    /// features from the homeserver if the [`SupportedVersions`] aren't
2213    /// found in the cache, take a look at the [`Client::server_versions()`]
2214    /// method.
2215    ///
2216    /// # Examples
2217    ///
2218    /// ```no_run
2219    /// use ruma::api::{FeatureFlag, MatrixVersion};
2220    /// # use matrix_sdk::{Client, config::SyncSettings};
2221    /// # use url::Url;
2222    /// # async {
2223    /// # let homeserver = Url::parse("http://localhost:8080")?;
2224    /// # let mut client = Client::new(homeserver).await?;
2225    ///
2226    /// let supported =
2227    ///     if let Some(supported) = client.supported_versions_cached().await? {
2228    ///         supported
2229    ///     } else {
2230    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2231    ///     };
2232    ///
2233    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2234    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2235    ///
2236    /// let msc_x_feature = FeatureFlag::from("msc_x");
2237    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2238    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2239    /// # anyhow::Ok(()) };
2240    /// ```
2241    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2242        if let Some(cached) = self.get_cached_supported_versions().await {
2243            Ok(Some(cached))
2244        } else if let Some(stored) =
2245            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2246        {
2247            if let Some(supported) =
2248                stored.into_supported_versions().and_then(|value| value.into_data())
2249            {
2250                Ok(Some(supported.supported_versions()))
2251            } else {
2252                Ok(None)
2253            }
2254        } else {
2255            Ok(None)
2256        }
2257    }
2258
2259    /// Get the Matrix versions supported by the homeserver by fetching them
2260    /// from the server or the cache.
2261    ///
2262    /// # Examples
2263    ///
2264    /// ```no_run
2265    /// use ruma::api::MatrixVersion;
2266    /// # use matrix_sdk::{Client, config::SyncSettings};
2267    /// # use url::Url;
2268    /// # async {
2269    /// # let homeserver = Url::parse("http://localhost:8080")?;
2270    /// # let mut client = Client::new(homeserver).await?;
2271    ///
2272    /// let server_versions = client.server_versions().await?;
2273    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2274    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2275    /// # anyhow::Ok(()) };
2276    /// ```
2277    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2278        Ok(self.supported_versions().await?.versions)
2279    }
2280
2281    /// Get the unstable features supported by the homeserver by fetching them
2282    /// from the server or the cache.
2283    ///
2284    /// # Examples
2285    ///
2286    /// ```no_run
2287    /// use matrix_sdk::ruma::api::FeatureFlag;
2288    /// # use matrix_sdk::{Client, config::SyncSettings};
2289    /// # use url::Url;
2290    /// # async {
2291    /// # let homeserver = Url::parse("http://localhost:8080")?;
2292    /// # let mut client = Client::new(homeserver).await?;
2293    ///
2294    /// let msc_x_feature = FeatureFlag::from("msc_x");
2295    /// let unstable_features = client.unstable_features().await?;
2296    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2297    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2298    /// # anyhow::Ok(()) };
2299    /// ```
2300    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2301        Ok(self.supported_versions().await?.features)
2302    }
2303
2304    /// Empty the supported versions and unstable features cache.
2305    ///
2306    /// Since the SDK caches the supported versions, it's possible to have a
2307    /// stale entry in the cache. This functions makes it possible to force
2308    /// reset it.
2309    pub async fn reset_supported_versions(&self) -> Result<()> {
2310        // Empty the in-memory caches.
2311        self.inner.caches.supported_versions.write().await.take();
2312
2313        // Empty the store cache.
2314        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2315    }
2316
2317    /// Load well-known from storage, or fetch it from network and cache it.
2318    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2319        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2320            Ok(Some(stored)) => {
2321                if let Some(well_known) =
2322                    stored.into_well_known().and_then(|value| value.into_data())
2323                {
2324                    return Ok(well_known);
2325                }
2326            }
2327            Ok(None) => {
2328                // fallthrough: cache is empty
2329            }
2330            Err(err) => {
2331                warn!("error when loading cached well-known: {err}");
2332                // fallthrough to network.
2333            }
2334        }
2335
2336        let well_known = self.fetch_client_well_known().await.map(Into::into);
2337
2338        // Attempt to cache the result in storage.
2339        if let Err(err) = self
2340            .state_store()
2341            .set_kv_data(
2342                StateStoreDataKey::WellKnown,
2343                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2344            )
2345            .await
2346        {
2347            warn!("error when caching well-known: {err}");
2348        }
2349
2350        Ok(well_known)
2351    }
2352
2353    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2354        let well_known = &self.inner.caches.well_known;
2355        if let CachedValue::Cached(val) = &*well_known.read().await {
2356            return Ok(val.clone());
2357        }
2358
2359        let mut guarded_well_known = well_known.write().await;
2360        if let CachedValue::Cached(val) = &*guarded_well_known {
2361            return Ok(val.clone());
2362        }
2363
2364        let well_known = self.load_or_fetch_well_known().await?;
2365
2366        *guarded_well_known = CachedValue::Cached(well_known.clone());
2367
2368        Ok(well_known)
2369    }
2370
2371    /// Get information about the homeserver's advertised RTC foci by fetching
2372    /// the well-known file from the server or the cache.
2373    ///
2374    /// # Examples
2375    /// ```no_run
2376    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2377    /// # use url::Url;
2378    /// # async {
2379    /// # let homeserver = Url::parse("http://localhost:8080")?;
2380    /// # let mut client = Client::new(homeserver).await?;
2381    /// let rtc_foci = client.rtc_foci().await?;
2382    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2383    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2384    ///     _ => None,
2385    /// });
2386    /// if let Some(info) = default_livekit_focus_info {
2387    ///     println!("Default LiveKit service URL: {}", info.service_url);
2388    /// }
2389    /// # anyhow::Ok(()) };
2390    /// ```
2391    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2392        let well_known = self.get_or_load_and_cache_well_known().await?;
2393
2394        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2395    }
2396
2397    /// Empty the well-known cache.
2398    ///
2399    /// Since the SDK caches the well-known, it's possible to have a stale entry
2400    /// in the cache. This functions makes it possible to force reset it.
2401    pub async fn reset_well_known(&self) -> Result<()> {
2402        // Empty the in-memory caches.
2403        self.inner.caches.well_known.write().await.take();
2404
2405        // Empty the store cache.
2406        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2407    }
2408
2409    /// Check whether MSC 4028 is enabled on the homeserver.
2410    ///
2411    /// # Examples
2412    ///
2413    /// ```no_run
2414    /// # use matrix_sdk::{Client, config::SyncSettings};
2415    /// # use url::Url;
2416    /// # async {
2417    /// # let homeserver = Url::parse("http://localhost:8080")?;
2418    /// # let mut client = Client::new(homeserver).await?;
2419    /// let msc4028_enabled =
2420    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2421    /// # anyhow::Ok(()) };
2422    /// ```
2423    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2424        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2425    }
2426
2427    /// Get information of all our own devices.
2428    ///
2429    /// # Examples
2430    ///
2431    /// ```no_run
2432    /// # use matrix_sdk::{Client, config::SyncSettings};
2433    /// # use url::Url;
2434    /// # async {
2435    /// # let homeserver = Url::parse("http://localhost:8080")?;
2436    /// # let mut client = Client::new(homeserver).await?;
2437    /// let response = client.devices().await?;
2438    ///
2439    /// for device in response.devices {
2440    ///     println!(
2441    ///         "Device: {} {}",
2442    ///         device.device_id,
2443    ///         device.display_name.as_deref().unwrap_or("")
2444    ///     );
2445    /// }
2446    /// # anyhow::Ok(()) };
2447    /// ```
2448    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2449        let request = get_devices::v3::Request::new();
2450
2451        self.send(request).await
2452    }
2453
2454    /// Delete the given devices from the server.
2455    ///
2456    /// # Arguments
2457    ///
2458    /// * `devices` - The list of devices that should be deleted from the
2459    ///   server.
2460    ///
2461    /// * `auth_data` - This request requires user interactive auth, the first
2462    ///   request needs to set this to `None` and will always fail with an
2463    ///   `UiaaResponse`. The response will contain information for the
2464    ///   interactive auth and the same request needs to be made but this time
2465    ///   with some `auth_data` provided.
2466    ///
2467    /// ```no_run
2468    /// # use matrix_sdk::{
2469    /// #    ruma::{api::client::uiaa, device_id},
2470    /// #    Client, Error, config::SyncSettings,
2471    /// # };
2472    /// # use serde_json::json;
2473    /// # use url::Url;
2474    /// # use std::collections::BTreeMap;
2475    /// # async {
2476    /// # let homeserver = Url::parse("http://localhost:8080")?;
2477    /// # let mut client = Client::new(homeserver).await?;
2478    /// let devices = &[device_id!("DEVICEID").to_owned()];
2479    ///
2480    /// if let Err(e) = client.delete_devices(devices, None).await {
2481    ///     if let Some(info) = e.as_uiaa_response() {
2482    ///         let mut password = uiaa::Password::new(
2483    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2484    ///             "wordpass".to_owned(),
2485    ///         );
2486    ///         password.session = info.session.clone();
2487    ///
2488    ///         client
2489    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2490    ///             .await?;
2491    ///     }
2492    /// }
2493    /// # anyhow::Ok(()) };
2494    pub async fn delete_devices(
2495        &self,
2496        devices: &[OwnedDeviceId],
2497        auth_data: Option<uiaa::AuthData>,
2498    ) -> HttpResult<delete_devices::v3::Response> {
2499        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2500        request.auth = auth_data;
2501
2502        self.send(request).await
2503    }
2504
2505    /// Change the display name of a device owned by the current user.
2506    ///
2507    /// Returns a `update_device::Response` which specifies the result
2508    /// of the operation.
2509    ///
2510    /// # Arguments
2511    ///
2512    /// * `device_id` - The ID of the device to change the display name of.
2513    /// * `display_name` - The new display name to set.
2514    pub async fn rename_device(
2515        &self,
2516        device_id: &DeviceId,
2517        display_name: &str,
2518    ) -> HttpResult<update_device::v3::Response> {
2519        let mut request = update_device::v3::Request::new(device_id.to_owned());
2520        request.display_name = Some(display_name.to_owned());
2521
2522        self.send(request).await
2523    }
2524
2525    /// Check whether a device with a specific ID exists on the server.
2526    ///
2527    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2528    /// with 404 and the underlying error otherwise.
2529    ///
2530    /// # Arguments
2531    ///
2532    /// * `device_id` - The ID of the device to query.
2533    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2534        let request = device::get_device::v3::Request::new(device_id);
2535        match self.send(request).await {
2536            Ok(_) => Ok(true),
2537            Err(err) => {
2538                if let Some(error) = err.as_client_api_error()
2539                    && error.status_code == 404
2540                {
2541                    Ok(false)
2542                } else {
2543                    Err(err.into())
2544                }
2545            }
2546        }
2547    }
2548
2549    /// Synchronize the client's state with the latest state on the server.
2550    ///
2551    /// ## Syncing Events
2552    ///
2553    /// Messages or any other type of event need to be periodically fetched from
2554    /// the server, this is achieved by sending a `/sync` request to the server.
2555    ///
2556    /// The first sync is sent out without a [`token`]. The response of the
2557    /// first sync will contain a [`next_batch`] field which should then be
2558    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2559    /// don't receive the same events multiple times.
2560    ///
2561    /// ## Long Polling
2562    ///
2563    /// A sync should in the usual case always be in flight. The
2564    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2565    /// long the server will wait for new events before it will respond.
2566    /// The server will respond immediately if some new events arrive before the
2567    /// timeout has expired. If no changes arrive and the timeout expires an
2568    /// empty sync response will be sent to the client.
2569    ///
2570    /// This method of sending a request that may not receive a response
2571    /// immediately is called long polling.
2572    ///
2573    /// ## Filtering Events
2574    ///
2575    /// The number or type of messages and events that the client should receive
2576    /// from the server can be altered using a [`Filter`].
2577    ///
2578    /// Filters can be non-trivial and, since they will be sent with every sync
2579    /// request, they may take up a bunch of unnecessary bandwidth.
2580    ///
2581    /// Luckily filters can be uploaded to the server and reused using an unique
2582    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2583    /// method.
2584    ///
2585    /// # Arguments
2586    ///
2587    /// * `sync_settings` - Settings for the sync call, this allows us to set
2588    /// various options to configure the sync:
2589    ///     * [`filter`] - To configure which events we receive and which get
2590    ///       [filtered] by the server
2591    ///     * [`timeout`] - To configure our [long polling] setup.
2592    ///     * [`token`] - To tell the server which events we already received
2593    ///       and where we wish to continue syncing.
2594    ///     * [`full_state`] - To tell the server that we wish to receive all
2595    ///       state events, regardless of our configured [`token`].
2596    ///     * [`set_presence`] - To tell the server to set the presence and to
2597    ///       which state.
2598    ///
2599    /// # Examples
2600    ///
2601    /// ```no_run
2602    /// # use url::Url;
2603    /// # async {
2604    /// # let homeserver = Url::parse("http://localhost:8080")?;
2605    /// # let username = "";
2606    /// # let password = "";
2607    /// use matrix_sdk::{
2608    ///     Client, config::SyncSettings,
2609    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2610    /// };
2611    ///
2612    /// let client = Client::new(homeserver).await?;
2613    /// client.matrix_auth().login_username(username, password).send().await?;
2614    ///
2615    /// // Sync once so we receive the client state and old messages.
2616    /// client.sync_once(SyncSettings::default()).await?;
2617    ///
2618    /// // Register our handler so we start responding once we receive a new
2619    /// // event.
2620    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2621    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2622    /// });
2623    ///
2624    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2625    /// // from our `sync_once()` call automatically.
2626    /// client.sync(SyncSettings::default()).await;
2627    /// # anyhow::Ok(()) };
2628    /// ```
2629    ///
2630    /// [`sync`]: #method.sync
2631    /// [`SyncSettings`]: crate::config::SyncSettings
2632    /// [`token`]: crate::config::SyncSettings#method.token
2633    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2634    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2635    /// [`set_presence`]: ruma::presence::PresenceState
2636    /// [`filter`]: crate::config::SyncSettings#method.filter
2637    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2638    /// [`next_batch`]: SyncResponse#structfield.next_batch
2639    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2640    /// [long polling]: #long-polling
2641    /// [filtered]: #filtering-events
2642    #[instrument(skip(self))]
2643    pub async fn sync_once(
2644        &self,
2645        sync_settings: crate::config::SyncSettings,
2646    ) -> Result<SyncResponse> {
2647        // The sync might not return for quite a while due to the timeout.
2648        // We'll see if there's anything crypto related to send out before we
2649        // sync, i.e. if we closed our client after a sync but before the
2650        // crypto requests were sent out.
2651        //
2652        // This will mostly be a no-op.
2653        #[cfg(feature = "e2e-encryption")]
2654        if let Err(e) = self.send_outgoing_requests().await {
2655            error!(error = ?e, "Error while sending outgoing E2EE requests");
2656        }
2657
2658        let token = match sync_settings.token {
2659            SyncToken::Specific(token) => Some(token),
2660            SyncToken::NoToken => None,
2661            SyncToken::ReusePrevious => self.sync_token().await,
2662        };
2663
2664        let request = assign!(sync_events::v3::Request::new(), {
2665            filter: sync_settings.filter.map(|f| *f),
2666            since: token,
2667            full_state: sync_settings.full_state,
2668            set_presence: sync_settings.set_presence,
2669            timeout: sync_settings.timeout,
2670            use_state_after: true,
2671        });
2672        let mut request_config = self.request_config();
2673        if let Some(timeout) = sync_settings.timeout {
2674            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2675            request_config.timeout = Some(base_timeout + timeout);
2676        }
2677
2678        let response = self.send(request).with_request_config(request_config).await?;
2679        let next_batch = response.next_batch.clone();
2680        let response = self.process_sync(response).await?;
2681
2682        #[cfg(feature = "e2e-encryption")]
2683        if let Err(e) = self.send_outgoing_requests().await {
2684            error!(error = ?e, "Error while sending outgoing E2EE requests");
2685        }
2686
2687        self.inner.sync_beat.notify(usize::MAX);
2688
2689        Ok(SyncResponse::new(next_batch, response))
2690    }
2691
2692    /// Repeatedly synchronize the client state with the server.
2693    ///
2694    /// This method will only return on error, if cancellation is needed
2695    /// the method should be wrapped in a cancelable task or the
2696    /// [`Client::sync_with_callback`] method can be used or
2697    /// [`Client::sync_with_result_callback`] if you want to handle error
2698    /// cases in the loop, too.
2699    ///
2700    /// This method will internally call [`Client::sync_once`] in a loop.
2701    ///
2702    /// This method can be used with the [`Client::add_event_handler`]
2703    /// method to react to individual events. If you instead wish to handle
2704    /// events in a bulk manner the [`Client::sync_with_callback`],
2705    /// [`Client::sync_with_result_callback`] and
2706    /// [`Client::sync_stream`] methods can be used instead. Those methods
2707    /// repeatedly return the whole sync response.
2708    ///
2709    /// # Arguments
2710    ///
2711    /// * `sync_settings` - Settings for the sync call. *Note* that those
2712    ///   settings will be only used for the first sync call. See the argument
2713    ///   docs for [`Client::sync_once`] for more info.
2714    ///
2715    /// # Return
2716    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2717    /// up to the user of the API to check the error and decide whether the sync
2718    /// should continue or not.
2719    ///
2720    /// # Examples
2721    ///
2722    /// ```no_run
2723    /// # use url::Url;
2724    /// # async {
2725    /// # let homeserver = Url::parse("http://localhost:8080")?;
2726    /// # let username = "";
2727    /// # let password = "";
2728    /// use matrix_sdk::{
2729    ///     Client, config::SyncSettings,
2730    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2731    /// };
2732    ///
2733    /// let client = Client::new(homeserver).await?;
2734    /// client.matrix_auth().login_username(&username, &password).send().await?;
2735    ///
2736    /// // Register our handler so we start responding once we receive a new
2737    /// // event.
2738    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2739    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2740    /// });
2741    ///
2742    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2743    /// // automatically.
2744    /// client.sync(SyncSettings::default()).await?;
2745    /// # anyhow::Ok(()) };
2746    /// ```
2747    ///
2748    /// [argument docs]: #method.sync_once
2749    /// [`sync_with_callback`]: #method.sync_with_callback
2750    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2751        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2752    }
2753
2754    /// Repeatedly call sync to synchronize the client state with the server.
2755    ///
2756    /// # Arguments
2757    ///
2758    /// * `sync_settings` - Settings for the sync call. *Note* that those
2759    ///   settings will be only used for the first sync call. See the argument
2760    ///   docs for [`Client::sync_once`] for more info.
2761    ///
2762    /// * `callback` - A callback that will be called every time a successful
2763    ///   response has been fetched from the server. The callback must return a
2764    ///   boolean which signalizes if the method should stop syncing. If the
2765    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2766    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2767    ///
2768    /// # Return
2769    /// The sync runs until an error occurs or the
2770    /// callback indicates that the Loop should stop. If the callback asked for
2771    /// a regular stop, the result will be `Ok(())` otherwise the
2772    /// `Err(Error)` is returned.
2773    ///
2774    /// # Examples
2775    ///
2776    /// The following example demonstrates how to sync forever while sending all
2777    /// the interesting events through a mpsc channel to another thread e.g. a
2778    /// UI thread.
2779    ///
2780    /// ```no_run
2781    /// # use std::time::Duration;
2782    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2783    /// # use url::Url;
2784    /// # async {
2785    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2786    /// # let mut client = Client::new(homeserver).await.unwrap();
2787    ///
2788    /// use tokio::sync::mpsc::channel;
2789    ///
2790    /// let (tx, rx) = channel(100);
2791    ///
2792    /// let sync_channel = &tx;
2793    /// let sync_settings = SyncSettings::new()
2794    ///     .timeout(Duration::from_secs(30));
2795    ///
2796    /// client
2797    ///     .sync_with_callback(sync_settings, |response| async move {
2798    ///         let channel = sync_channel;
2799    ///         for (room_id, room) in response.rooms.joined {
2800    ///             for event in room.timeline.events {
2801    ///                 channel.send(event).await.unwrap();
2802    ///             }
2803    ///         }
2804    ///
2805    ///         LoopCtrl::Continue
2806    ///     })
2807    ///     .await;
2808    /// };
2809    /// ```
2810    #[instrument(skip_all)]
2811    pub async fn sync_with_callback<C>(
2812        &self,
2813        sync_settings: crate::config::SyncSettings,
2814        callback: impl Fn(SyncResponse) -> C,
2815    ) -> Result<(), Error>
2816    where
2817        C: Future<Output = LoopCtrl>,
2818    {
2819        self.sync_with_result_callback(sync_settings, |result| async {
2820            Ok(callback(result?).await)
2821        })
2822        .await
2823    }
2824
2825    /// Repeatedly call sync to synchronize the client state with the server.
2826    ///
2827    /// # Arguments
2828    ///
2829    /// * `sync_settings` - Settings for the sync call. *Note* that those
2830    ///   settings will be only used for the first sync call. See the argument
2831    ///   docs for [`Client::sync_once`] for more info.
2832    ///
2833    /// * `callback` - A callback that will be called every time after a
2834    ///   response has been received, failure or not. The callback returns a
2835    ///   `Result<LoopCtrl, Error>`, too. When returning
2836    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2837    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2838    ///   function returns `Ok(())`. In case the callback can't handle the
2839    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2840    ///   which results in the sync ending and the `Err(Error)` being returned.
2841    ///
2842    /// # Return
2843    /// The sync runs until an error occurs that the callback can't handle or
2844    /// the callback indicates that the Loop should stop. If the callback
2845    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2846    /// `Err(Error)` is returned.
2847    ///
2848    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2849    /// this, and are handled first without sending the result to the
2850    /// callback. Only after they have exceeded is the `Result` handed to
2851    /// the callback.
2852    ///
2853    /// # Examples
2854    ///
2855    /// The following example demonstrates how to sync forever while sending all
2856    /// the interesting events through a mpsc channel to another thread e.g. a
2857    /// UI thread.
2858    ///
2859    /// ```no_run
2860    /// # use std::time::Duration;
2861    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2862    /// # use url::Url;
2863    /// # async {
2864    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2865    /// # let mut client = Client::new(homeserver).await.unwrap();
2866    /// #
2867    /// use tokio::sync::mpsc::channel;
2868    ///
2869    /// let (tx, rx) = channel(100);
2870    ///
2871    /// let sync_channel = &tx;
2872    /// let sync_settings = SyncSettings::new()
2873    ///     .timeout(Duration::from_secs(30));
2874    ///
2875    /// client
2876    ///     .sync_with_result_callback(sync_settings, |response| async move {
2877    ///         let channel = sync_channel;
2878    ///         let sync_response = response?;
2879    ///         for (room_id, room) in sync_response.rooms.joined {
2880    ///              for event in room.timeline.events {
2881    ///                  channel.send(event).await.unwrap();
2882    ///               }
2883    ///         }
2884    ///
2885    ///         Ok(LoopCtrl::Continue)
2886    ///     })
2887    ///     .await;
2888    /// };
2889    /// ```
2890    #[instrument(skip(self, callback))]
2891    pub async fn sync_with_result_callback<C>(
2892        &self,
2893        sync_settings: crate::config::SyncSettings,
2894        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2895    ) -> Result<(), Error>
2896    where
2897        C: Future<Output = Result<LoopCtrl, Error>>,
2898    {
2899        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2900
2901        while let Some(result) = sync_stream.next().await {
2902            trace!("Running callback");
2903            if callback(result).await? == LoopCtrl::Break {
2904                trace!("Callback told us to stop");
2905                break;
2906            }
2907            trace!("Done running callback");
2908        }
2909
2910        Ok(())
2911    }
2912
2913    //// Repeatedly synchronize the client state with the server.
2914    ///
2915    /// This method will internally call [`Client::sync_once`] in a loop and is
2916    /// equivalent to the [`Client::sync`] method but the responses are provided
2917    /// as an async stream.
2918    ///
2919    /// # Arguments
2920    ///
2921    /// * `sync_settings` - Settings for the sync call. *Note* that those
2922    ///   settings will be only used for the first sync call. See the argument
2923    ///   docs for [`Client::sync_once`] for more info.
2924    ///
2925    /// # Examples
2926    ///
2927    /// ```no_run
2928    /// # use url::Url;
2929    /// # async {
2930    /// # let homeserver = Url::parse("http://localhost:8080")?;
2931    /// # let username = "";
2932    /// # let password = "";
2933    /// use futures_util::StreamExt;
2934    /// use matrix_sdk::{Client, config::SyncSettings};
2935    ///
2936    /// let client = Client::new(homeserver).await?;
2937    /// client.matrix_auth().login_username(&username, &password).send().await?;
2938    ///
2939    /// let mut sync_stream =
2940    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2941    ///
2942    /// while let Some(Ok(response)) = sync_stream.next().await {
2943    ///     for room in response.rooms.joined.values() {
2944    ///         for e in &room.timeline.events {
2945    ///             if let Ok(event) = e.raw().deserialize() {
2946    ///                 println!("Received event {:?}", event);
2947    ///             }
2948    ///         }
2949    ///     }
2950    /// }
2951    ///
2952    /// # anyhow::Ok(()) };
2953    /// ```
2954    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2955    #[instrument(skip(self))]
2956    pub async fn sync_stream(
2957        &self,
2958        mut sync_settings: crate::config::SyncSettings,
2959    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2960        let mut is_first_sync = true;
2961        let mut timeout = None;
2962        let mut last_sync_time: Option<Instant> = None;
2963
2964        let parent_span = Span::current();
2965
2966        async_stream::stream!({
2967            loop {
2968                trace!("Syncing");
2969
2970                if sync_settings.ignore_timeout_on_first_sync {
2971                    if is_first_sync {
2972                        timeout = sync_settings.timeout.take();
2973                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2974                        sync_settings.timeout = timeout.take();
2975                    }
2976
2977                    is_first_sync = false;
2978                }
2979
2980                yield self
2981                    .sync_loop_helper(&mut sync_settings)
2982                    .instrument(parent_span.clone())
2983                    .await;
2984
2985                Client::delay_sync(&mut last_sync_time).await
2986            }
2987        })
2988    }
2989
2990    /// Get the current, if any, sync token of the client.
2991    /// This will be None if the client didn't sync at least once.
2992    pub(crate) async fn sync_token(&self) -> Option<String> {
2993        self.inner.base_client.sync_token().await
2994    }
2995
2996    /// Gets information about the owner of a given access token.
2997    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2998        let request = whoami::v3::Request::new();
2999        self.send(request).await
3000    }
3001
3002    /// Subscribes a new receiver to client SessionChange broadcasts.
3003    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3004        let broadcast = &self.auth_ctx().session_change_sender;
3005        broadcast.subscribe()
3006    }
3007
3008    /// Sets the save/restore session callbacks.
3009    ///
3010    /// This is another mechanism to get synchronous updates to session tokens,
3011    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3012    pub fn set_session_callbacks(
3013        &self,
3014        reload_session_callback: Box<ReloadSessionCallback>,
3015        save_session_callback: Box<SaveSessionCallback>,
3016    ) -> Result<()> {
3017        self.inner
3018            .auth_ctx
3019            .reload_session_callback
3020            .set(reload_session_callback)
3021            .map_err(|_| Error::MultipleSessionCallbacks)?;
3022
3023        self.inner
3024            .auth_ctx
3025            .save_session_callback
3026            .set(save_session_callback)
3027            .map_err(|_| Error::MultipleSessionCallbacks)?;
3028
3029        Ok(())
3030    }
3031
3032    /// Get the notification settings of the current owner of the client.
3033    pub async fn notification_settings(&self) -> NotificationSettings {
3034        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3035        NotificationSettings::new(self.clone(), ruleset)
3036    }
3037
3038    /// Create a new specialized `Client` that can process notifications.
3039    ///
3040    /// See [`CrossProcessLock::new`] to learn more about
3041    /// `cross_process_store_locks_holder_name`.
3042    ///
3043    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3044    pub async fn notification_client(
3045        &self,
3046        cross_process_store_locks_holder_name: String,
3047    ) -> Result<Client> {
3048        let client = Client {
3049            inner: ClientInner::new(
3050                self.inner.auth_ctx.clone(),
3051                self.server().cloned(),
3052                self.homeserver(),
3053                self.sliding_sync_version(),
3054                self.inner.http_client.clone(),
3055                self.inner
3056                    .base_client
3057                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
3058                    .await?,
3059                self.inner.caches.supported_versions.read().await.clone(),
3060                self.inner.caches.well_known.read().await.clone(),
3061                self.inner.respect_login_well_known,
3062                self.inner.event_cache.clone(),
3063                self.inner.send_queue_data.clone(),
3064                self.inner.latest_events.clone(),
3065                #[cfg(feature = "e2e-encryption")]
3066                self.inner.e2ee.encryption_settings,
3067                #[cfg(feature = "e2e-encryption")]
3068                self.inner.enable_share_history_on_invite,
3069                cross_process_store_locks_holder_name,
3070                #[cfg(feature = "experimental-search")]
3071                self.inner.search_index.clone(),
3072                self.inner.thread_subscription_catchup.clone(),
3073            )
3074            .await,
3075        };
3076
3077        Ok(client)
3078    }
3079
3080    /// The [`EventCache`] instance for this [`Client`].
3081    pub fn event_cache(&self) -> &EventCache {
3082        // SAFETY: always initialized in the `Client` ctor.
3083        self.inner.event_cache.get().unwrap()
3084    }
3085
3086    /// The [`LatestEvents`] instance for this [`Client`].
3087    pub async fn latest_events(&self) -> &LatestEvents {
3088        self.inner
3089            .latest_events
3090            .get_or_init(|| async {
3091                LatestEvents::new(
3092                    WeakClient::from_client(self),
3093                    self.event_cache().clone(),
3094                    SendQueue::new(self.clone()),
3095                )
3096            })
3097            .await
3098    }
3099
3100    /// Waits until an at least partially synced room is received, and returns
3101    /// it.
3102    ///
3103    /// **Note: this function will loop endlessly until either it finds the room
3104    /// or an externally set timeout happens.**
3105    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3106        loop {
3107            if let Some(room) = self.get_room(room_id) {
3108                if room.is_state_partially_or_fully_synced() {
3109                    debug!("Found just created room!");
3110                    return room;
3111                }
3112                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3113            } else {
3114                debug!("Room wasn't found, waiting for sync beat to try again");
3115            }
3116            self.inner.sync_beat.listen().await;
3117        }
3118    }
3119
3120    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3121    /// join it.
3122    pub async fn knock(
3123        &self,
3124        room_id_or_alias: OwnedRoomOrAliasId,
3125        reason: Option<String>,
3126        server_names: Vec<OwnedServerName>,
3127    ) -> Result<Room> {
3128        let request =
3129            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3130        let response = self.send(request).await?;
3131        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3132        Ok(Room::new(self.clone(), base_room))
3133    }
3134
3135    /// Checks whether the provided `user_id` belongs to an ignored user.
3136    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3137        self.base_client().is_user_ignored(user_id).await
3138    }
3139
3140    /// Gets the `max_upload_size` value from the homeserver, getting either a
3141    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3142    /// missing.
3143    ///
3144    /// Check the spec for more info:
3145    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3146    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3147        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3148        if let Some(data) = max_upload_size_lock.get() {
3149            return Ok(data.to_owned());
3150        }
3151
3152        // Use the authenticated endpoint when the server supports it.
3153        let supported_versions = self.supported_versions().await?;
3154        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3155            .is_supported(&supported_versions);
3156
3157        let upload_size = if use_auth {
3158            self.send(authenticated_media::get_media_config::v1::Request::default())
3159                .await?
3160                .upload_size
3161        } else {
3162            #[allow(deprecated)]
3163            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3164        };
3165
3166        match max_upload_size_lock.set(upload_size) {
3167            Ok(_) => Ok(upload_size),
3168            Err(error) => {
3169                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3170            }
3171        }
3172    }
3173
3174    /// The settings to use for decrypting events.
3175    #[cfg(feature = "e2e-encryption")]
3176    pub fn decryption_settings(&self) -> &DecryptionSettings {
3177        &self.base_client().decryption_settings
3178    }
3179
3180    /// Returns the [`SearchIndex`] for this [`Client`].
3181    #[cfg(feature = "experimental-search")]
3182    pub fn search_index(&self) -> &SearchIndex {
3183        &self.inner.search_index
3184    }
3185
3186    /// Whether the client is configured to take thread subscriptions (MSC4306
3187    /// and MSC4308) into account.
3188    ///
3189    /// This may cause filtering out of thread subscriptions, and loading the
3190    /// thread subscriptions via the sliding sync extension, when the room
3191    /// list service is being used.
3192    pub fn enabled_thread_subscriptions(&self) -> bool {
3193        match self.base_client().threading_support {
3194            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3195            ThreadingSupport::Disabled => false,
3196        }
3197    }
3198
3199    /// Fetch thread subscriptions changes between `from` and up to `to`.
3200    ///
3201    /// The `limit` optional parameter can be used to limit the number of
3202    /// entries in a response. It can also be overridden by the server, if
3203    /// it's deemed too large.
3204    pub async fn fetch_thread_subscriptions(
3205        &self,
3206        from: Option<String>,
3207        to: Option<String>,
3208        limit: Option<UInt>,
3209    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3210        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3211            from,
3212            to,
3213            limit,
3214        });
3215        Ok(self.send(request).await?)
3216    }
3217
3218    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3219        self.inner.thread_subscription_catchup.get().unwrap()
3220    }
3221
3222    /// Perform database optimizations if any are available, i.e. vacuuming in
3223    /// SQLite.
3224    ///
3225    /// **Warning:** this was added to check if SQLite fragmentation was the
3226    /// source of performance issues, **DO NOT use in production**.
3227    #[doc(hidden)]
3228    pub async fn optimize_stores(&self) -> Result<()> {
3229        trace!("Optimizing state store...");
3230        self.state_store().optimize().await?;
3231
3232        trace!("Optimizing event cache store...");
3233        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3234            clean_lock.optimize().await?;
3235        }
3236
3237        trace!("Optimizing media store...");
3238        self.media_store().lock().await?.optimize().await?;
3239
3240        Ok(())
3241    }
3242
3243    /// Returns the sizes of the existing stores, if known.
3244    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3245        #[cfg(feature = "e2e-encryption")]
3246        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3247            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3248        {
3249            Some(store_size)
3250        } else {
3251            None
3252        };
3253        #[cfg(not(feature = "e2e-encryption"))]
3254        let crypto_store_size = None;
3255
3256        let state_store_size = self.state_store().get_size().await.ok().flatten();
3257
3258        let event_cache_store_size = if let Some(clean_lock) =
3259            self.event_cache_store().lock().await?.as_clean()
3260            && let Ok(Some(store_size)) = clean_lock.get_size().await
3261        {
3262            Some(store_size)
3263        } else {
3264            None
3265        };
3266
3267        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3268
3269        Ok(StoreSizes {
3270            crypto_store: crypto_store_size,
3271            state_store: state_store_size,
3272            event_cache_store: event_cache_store_size,
3273            media_store: media_store_size,
3274        })
3275    }
3276}
3277
3278/// Contains the disk size of the different stores, if known. It won't be
3279/// available for in-memory stores.
3280#[derive(Debug, Clone)]
3281pub struct StoreSizes {
3282    /// The size of the CryptoStore.
3283    pub crypto_store: Option<usize>,
3284    /// The size of the StateStore.
3285    pub state_store: Option<usize>,
3286    /// The size of the EventCacheStore.
3287    pub event_cache_store: Option<usize>,
3288    /// The size of the MediaStore.
3289    pub media_store: Option<usize>,
3290}
3291
3292#[cfg(any(feature = "testing", test))]
3293impl Client {
3294    /// Test helper to mark users as tracked by the crypto layer.
3295    #[cfg(feature = "e2e-encryption")]
3296    pub async fn update_tracked_users_for_testing(
3297        &self,
3298        user_ids: impl IntoIterator<Item = &UserId>,
3299    ) {
3300        let olm = self.olm_machine().await;
3301        let olm = olm.as_ref().unwrap();
3302        olm.update_tracked_users(user_ids).await.unwrap();
3303    }
3304}
3305
3306/// A weak reference to the inner client, useful when trying to get a handle
3307/// on the owning client.
3308#[derive(Clone, Debug)]
3309pub(crate) struct WeakClient {
3310    client: Weak<ClientInner>,
3311}
3312
3313impl WeakClient {
3314    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3315    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3316        Self { client: Arc::downgrade(client) }
3317    }
3318
3319    /// Construct a [`WeakClient`] from a [`Client`].
3320    pub fn from_client(client: &Client) -> Self {
3321        Self::from_inner(&client.inner)
3322    }
3323
3324    /// Attempts to get a [`Client`] from this [`WeakClient`].
3325    pub fn get(&self) -> Option<Client> {
3326        self.client.upgrade().map(|inner| Client { inner })
3327    }
3328
3329    /// Gets the number of strong (`Arc`) pointers still pointing to this
3330    /// client.
3331    #[allow(dead_code)]
3332    pub fn strong_count(&self) -> usize {
3333        self.client.strong_count()
3334    }
3335}
3336
3337/// Information about the state of a room before we joined it.
3338#[derive(Debug, Clone, Default)]
3339struct PreJoinRoomInfo {
3340    /// The user who invited us to the room, if any.
3341    pub inviter: Option<RoomMember>,
3342}
3343
3344// The http mocking library is not supported for wasm32
3345#[cfg(all(test, not(target_family = "wasm")))]
3346pub(crate) mod tests {
3347    use std::{sync::Arc, time::Duration};
3348
3349    use assert_matches::assert_matches;
3350    use assert_matches2::assert_let;
3351    use eyeball::SharedObservable;
3352    use futures_util::{FutureExt, StreamExt, pin_mut};
3353    use js_int::{UInt, uint};
3354    use matrix_sdk_base::{
3355        RoomState,
3356        store::{MemoryStore, StoreConfig},
3357    };
3358    use matrix_sdk_test::{
3359        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3360        event_factory::EventFactory,
3361    };
3362    #[cfg(target_family = "wasm")]
3363    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3364
3365    use ruma::{
3366        RoomId, ServerName, UserId,
3367        api::{
3368            FeatureFlag, MatrixVersion,
3369            client::{
3370                discovery::discover_homeserver::RtcFocusInfo,
3371                room::create_room::v3::Request as CreateRoomRequest,
3372            },
3373        },
3374        assign,
3375        events::{
3376            ignored_user_list::IgnoredUserListEventContent,
3377            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3378        },
3379        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3380    };
3381    use serde_json::json;
3382    use stream_assert::{assert_next_matches, assert_pending};
3383    use tokio::{
3384        spawn,
3385        time::{sleep, timeout},
3386    };
3387    use url::Url;
3388
3389    use super::Client;
3390    use crate::{
3391        Error, TransmissionProgress,
3392        client::{WeakClient, futures::SendMediaUploadRequest},
3393        config::{RequestConfig, SyncSettings},
3394        futures::SendRequest,
3395        media::MediaError,
3396        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3397    };
3398
3399    #[async_test]
3400    async fn test_account_data() {
3401        let server = MatrixMockServer::new().await;
3402        let client = server.client_builder().build().await;
3403
3404        let f = EventFactory::new();
3405        server
3406            .mock_sync()
3407            .ok_and_run(&client, |builder| {
3408                builder.add_global_account_data(
3409                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3410                );
3411            })
3412            .await;
3413
3414        let content = client
3415            .account()
3416            .account_data::<IgnoredUserListEventContent>()
3417            .await
3418            .unwrap()
3419            .unwrap()
3420            .deserialize()
3421            .unwrap();
3422
3423        assert_eq!(content.ignored_users.len(), 1);
3424    }
3425
3426    #[async_test]
3427    async fn test_successful_discovery() {
3428        // Imagine this is `matrix.org`.
3429        let server = MatrixMockServer::new().await;
3430        let server_url = server.uri();
3431
3432        // Imagine this is `matrix-client.matrix.org`.
3433        let homeserver = MatrixMockServer::new().await;
3434        let homeserver_url = homeserver.uri();
3435
3436        // Imagine Alice has the user ID `@alice:matrix.org`.
3437        let domain = server_url.strip_prefix("http://").unwrap();
3438        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3439
3440        // The `.well-known` is on the server (e.g. `matrix.org`).
3441        server
3442            .mock_well_known()
3443            .ok_with_homeserver_url(&homeserver_url)
3444            .mock_once()
3445            .named("well-known")
3446            .mount()
3447            .await;
3448
3449        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3450        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3451
3452        let client = Client::builder()
3453            .insecure_server_name_no_tls(alice.server_name())
3454            .build()
3455            .await
3456            .unwrap();
3457
3458        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3459        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3460        client.server_versions().await.unwrap();
3461    }
3462
3463    #[async_test]
3464    async fn test_discovery_broken_server() {
3465        let server = MatrixMockServer::new().await;
3466        let server_url = server.uri();
3467        let domain = server_url.strip_prefix("http://").unwrap();
3468        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3469
3470        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3471
3472        assert!(
3473            Client::builder()
3474                .insecure_server_name_no_tls(alice.server_name())
3475                .build()
3476                .await
3477                .is_err(),
3478            "Creating a client from a user ID should fail when the .well-known request fails."
3479        );
3480    }
3481
3482    #[async_test]
3483    async fn test_room_creation() {
3484        let server = MatrixMockServer::new().await;
3485        let client = server.client_builder().build().await;
3486
3487        server
3488            .mock_sync()
3489            .ok_and_run(&client, |builder| {
3490                builder.add_joined_room(
3491                    JoinedRoomBuilder::default()
3492                        .add_state_event(StateTestEvent::Member)
3493                        .add_state_event(StateTestEvent::PowerLevels),
3494                );
3495            })
3496            .await;
3497
3498        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3499        assert_eq!(room.state(), RoomState::Joined);
3500    }
3501
3502    #[async_test]
3503    async fn test_retry_limit_http_requests() {
3504        let server = MatrixMockServer::new().await;
3505        let client = server
3506            .client_builder()
3507            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3508            .build()
3509            .await;
3510
3511        assert!(client.request_config().retry_limit.unwrap() == 3);
3512
3513        server.mock_login().error500().expect(3).mount().await;
3514
3515        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3516    }
3517
3518    #[async_test]
3519    async fn test_retry_timeout_http_requests() {
3520        // Keep this timeout small so that the test doesn't take long
3521        let retry_timeout = Duration::from_secs(5);
3522        let server = MatrixMockServer::new().await;
3523        let client = server
3524            .client_builder()
3525            .on_builder(|builder| {
3526                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3527            })
3528            .build()
3529            .await;
3530
3531        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3532
3533        server.mock_login().error500().expect(2..).mount().await;
3534
3535        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3536    }
3537
3538    #[async_test]
3539    async fn test_short_retry_initial_http_requests() {
3540        let server = MatrixMockServer::new().await;
3541        let client = server
3542            .client_builder()
3543            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3544            .build()
3545            .await;
3546
3547        server.mock_login().error500().expect(3..).mount().await;
3548
3549        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3550    }
3551
3552    #[async_test]
3553    async fn test_no_retry_http_requests() {
3554        let server = MatrixMockServer::new().await;
3555        let client = server.client_builder().build().await;
3556
3557        server.mock_devices().error500().mock_once().mount().await;
3558
3559        client.devices().await.unwrap_err();
3560    }
3561
3562    #[async_test]
3563    async fn test_set_homeserver() {
3564        let client = MockClientBuilder::new(None).build().await;
3565        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3566
3567        let homeserver = Url::parse("http://example.com/").unwrap();
3568        client.set_homeserver(homeserver.clone());
3569        assert_eq!(client.homeserver(), homeserver);
3570    }
3571
3572    #[async_test]
3573    async fn test_search_user_request() {
3574        let server = MatrixMockServer::new().await;
3575        let client = server.client_builder().build().await;
3576
3577        server.mock_user_directory().ok().mock_once().mount().await;
3578
3579        let response = client.search_users("test", 50).await.unwrap();
3580        assert_eq!(response.results.len(), 1);
3581        let result = &response.results[0];
3582        assert_eq!(result.user_id.to_string(), "@test:example.me");
3583        assert_eq!(result.display_name.clone().unwrap(), "Test");
3584        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3585        assert!(!response.limited);
3586    }
3587
3588    #[async_test]
3589    async fn test_request_unstable_features() {
3590        let server = MatrixMockServer::new().await;
3591        let client = server.client_builder().no_server_versions().build().await;
3592
3593        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3594
3595        let unstable_features = client.unstable_features().await.unwrap();
3596        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3597        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3598    }
3599
3600    #[async_test]
3601    async fn test_can_homeserver_push_encrypted_event_to_device() {
3602        let server = MatrixMockServer::new().await;
3603        let client = server.client_builder().no_server_versions().build().await;
3604
3605        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3606
3607        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3608        assert!(msc4028_enabled);
3609    }
3610
3611    #[async_test]
3612    async fn test_recently_visited_rooms() {
3613        // Tracking recently visited rooms requires authentication
3614        let client = MockClientBuilder::new(None).unlogged().build().await;
3615        assert_matches!(
3616            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3617            Err(Error::AuthenticationRequired)
3618        );
3619
3620        let client = MockClientBuilder::new(None).build().await;
3621        let account = client.account();
3622
3623        // We should start off with an empty list
3624        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3625
3626        // Tracking a valid room id should add it to the list
3627        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3628        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3629        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3630
3631        // And the existing list shouldn't be changed
3632        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3633        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3634
3635        // Tracking the same room again shouldn't change the list
3636        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3637        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3638        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3639
3640        // Tracking a second room should add it to the front of the list
3641        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3642        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3643        assert_eq!(
3644            account.get_recently_visited_rooms().await.unwrap(),
3645            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3646        );
3647
3648        // Tracking the first room yet again should move it to the front of the list
3649        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3650        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3651        assert_eq!(
3652            account.get_recently_visited_rooms().await.unwrap(),
3653            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3654        );
3655
3656        // Tracking should be capped at 20
3657        for n in 0..20 {
3658            account
3659                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3660                .await
3661                .unwrap();
3662        }
3663
3664        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3665
3666        // And the initial rooms should've been pushed out
3667        let rooms = account.get_recently_visited_rooms().await.unwrap();
3668        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3669        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3670
3671        // And the last tracked room should be the first
3672        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3673    }
3674
3675    #[async_test]
3676    async fn test_client_no_cycle_with_event_cache() {
3677        let client = MockClientBuilder::new(None).build().await;
3678
3679        // Wait for the init tasks to die.
3680        sleep(Duration::from_secs(1)).await;
3681
3682        let weak_client = WeakClient::from_client(&client);
3683        assert_eq!(weak_client.strong_count(), 1);
3684
3685        {
3686            let room_id = room_id!("!room:example.org");
3687
3688            // Have the client know the room.
3689            let response = SyncResponseBuilder::default()
3690                .add_joined_room(JoinedRoomBuilder::new(room_id))
3691                .build_sync_response();
3692            client.inner.base_client.receive_sync_response(response).await.unwrap();
3693
3694            client.event_cache().subscribe().unwrap();
3695
3696            let (_room_event_cache, _drop_handles) =
3697                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3698        }
3699
3700        drop(client);
3701
3702        // Give a bit of time for background tasks to die.
3703        sleep(Duration::from_secs(1)).await;
3704
3705        // The weak client must be the last reference to the client now.
3706        assert_eq!(weak_client.strong_count(), 0);
3707        let client = weak_client.get();
3708        assert!(
3709            client.is_none(),
3710            "too many strong references to the client: {}",
3711            Arc::strong_count(&client.unwrap().inner)
3712        );
3713    }
3714
3715    #[async_test]
3716    async fn test_supported_versions_caching() {
3717        let server = MatrixMockServer::new().await;
3718
3719        let versions_mock = server
3720            .mock_versions()
3721            .expect_default_access_token()
3722            .ok_with_unstable_features()
3723            .named("first versions mock")
3724            .expect(1)
3725            .mount_as_scoped()
3726            .await;
3727
3728        let memory_store = Arc::new(MemoryStore::new());
3729        let client = server
3730            .client_builder()
3731            .no_server_versions()
3732            .on_builder(|builder| {
3733                builder.store_config(
3734                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3735                        .state_store(memory_store.clone()),
3736                )
3737            })
3738            .build()
3739            .await;
3740
3741        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3742
3743        // The result was cached.
3744        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3745        // This subsequent call hits the in-memory cache.
3746        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3747
3748        drop(client);
3749
3750        let client = server
3751            .client_builder()
3752            .no_server_versions()
3753            .on_builder(|builder| {
3754                builder.store_config(
3755                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3756                        .state_store(memory_store.clone()),
3757                )
3758            })
3759            .build()
3760            .await;
3761
3762        // These calls to the new client hit the on-disk cache.
3763        assert!(
3764            client
3765                .unstable_features()
3766                .await
3767                .unwrap()
3768                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3769        );
3770        let supported = client.supported_versions().await.unwrap();
3771        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3772        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3773
3774        // Then this call hits the in-memory cache.
3775        let supported = client.supported_versions().await.unwrap();
3776        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3777        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3778
3779        drop(versions_mock);
3780
3781        // Now, reset the cache, and observe the endpoints being called again once.
3782        client.reset_supported_versions().await.unwrap();
3783
3784        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3785
3786        // Hits network again.
3787        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3788        // Hits in-memory cache again.
3789        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3790    }
3791
3792    #[async_test]
3793    async fn test_well_known_caching() {
3794        let server = MatrixMockServer::new().await;
3795        let server_url = server.uri();
3796        let domain = server_url.strip_prefix("http://").unwrap();
3797        let server_name = <&ServerName>::try_from(domain).unwrap();
3798        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3799
3800        let well_known_mock = server
3801            .mock_well_known()
3802            .ok()
3803            .named("well known mock")
3804            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3805            .mount_as_scoped()
3806            .await;
3807
3808        let memory_store = Arc::new(MemoryStore::new());
3809        let client = Client::builder()
3810            .insecure_server_name_no_tls(server_name)
3811            .store_config(
3812                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3813                    .state_store(memory_store.clone()),
3814            )
3815            .build()
3816            .await
3817            .unwrap();
3818
3819        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3820
3821        // This subsequent call hits the in-memory cache.
3822        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3823
3824        drop(client);
3825
3826        let client = server
3827            .client_builder()
3828            .no_server_versions()
3829            .on_builder(|builder| {
3830                builder.store_config(
3831                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3832                        .state_store(memory_store.clone()),
3833                )
3834            })
3835            .build()
3836            .await;
3837
3838        // This call to the new client hits the on-disk cache.
3839        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3840
3841        // Then this call hits the in-memory cache.
3842        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3843
3844        drop(well_known_mock);
3845
3846        // Now, reset the cache, and observe the endpoints being called again once.
3847        client.reset_well_known().await.unwrap();
3848
3849        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3850
3851        // Hits network again.
3852        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3853        // Hits in-memory cache again.
3854        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3855    }
3856
3857    #[async_test]
3858    async fn test_missing_well_known_caching() {
3859        let server = MatrixMockServer::new().await;
3860        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3861
3862        let well_known_mock = server
3863            .mock_well_known()
3864            .error_unrecognized()
3865            .named("first well-known mock")
3866            .expect(1)
3867            .mount_as_scoped()
3868            .await;
3869
3870        let memory_store = Arc::new(MemoryStore::new());
3871        let client = server
3872            .client_builder()
3873            .on_builder(|builder| {
3874                builder.store_config(
3875                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3876                        .state_store(memory_store.clone()),
3877                )
3878            })
3879            .build()
3880            .await;
3881
3882        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3883
3884        // This subsequent call hits the in-memory cache.
3885        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3886
3887        drop(client);
3888
3889        let client = server
3890            .client_builder()
3891            .on_builder(|builder| {
3892                builder.store_config(
3893                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3894                        .state_store(memory_store.clone()),
3895                )
3896            })
3897            .build()
3898            .await;
3899
3900        // This call to the new client hits the on-disk cache.
3901        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3902
3903        // Then this call hits the in-memory cache.
3904        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3905
3906        drop(well_known_mock);
3907
3908        // Now, reset the cache, and observe the endpoints being called again once.
3909        client.reset_well_known().await.unwrap();
3910
3911        server
3912            .mock_well_known()
3913            .error_unrecognized()
3914            .expect(1)
3915            .named("second well-known mock")
3916            .mount()
3917            .await;
3918
3919        // Hits network again.
3920        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3921        // Hits in-memory cache again.
3922        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3923    }
3924
3925    #[async_test]
3926    async fn test_no_network_doesnt_cause_infinite_retries() {
3927        // We want infinite retries for transient errors.
3928        let client = MockClientBuilder::new(None)
3929            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3930            .build()
3931            .await;
3932
3933        // We don't define a mock server on purpose here, so that the error is really a
3934        // network error.
3935        client.whoami().await.unwrap_err();
3936    }
3937
3938    #[async_test]
3939    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3940        let server = MatrixMockServer::new().await;
3941        let client = server.client_builder().build().await;
3942
3943        let room_id = room_id!("!room:example.org");
3944
3945        server
3946            .mock_sync()
3947            .ok_and_run(&client, |builder| {
3948                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3949            })
3950            .await;
3951
3952        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3953        assert_eq!(room.room_id(), room_id);
3954    }
3955
3956    #[async_test]
3957    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3958        let server = MatrixMockServer::new().await;
3959        let client = server.client_builder().build().await;
3960
3961        let room_id = room_id!("!room:example.org");
3962
3963        let client = Arc::new(client);
3964
3965        // Perform the /sync request with a delay so it starts after the
3966        // `await_room_remote_echo` call has happened
3967        spawn({
3968            let client = client.clone();
3969            async move {
3970                sleep(Duration::from_millis(100)).await;
3971
3972                server
3973                    .mock_sync()
3974                    .ok_and_run(&client, |builder| {
3975                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3976                    })
3977                    .await;
3978            }
3979        });
3980
3981        let room =
3982            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3983        assert_eq!(room.room_id(), room_id);
3984    }
3985
3986    #[async_test]
3987    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3988        let client = MockClientBuilder::new(None).build().await;
3989
3990        let room_id = room_id!("!room:example.org");
3991        // Room is not present so the client won't be able to find it. The call will
3992        // timeout.
3993        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3994    }
3995
3996    #[async_test]
3997    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3998        let server = MatrixMockServer::new().await;
3999        let client = server.client_builder().build().await;
4000
4001        server.mock_create_room().ok().mount().await;
4002
4003        // Create a room in the internal store
4004        let room = client
4005            .create_room(assign!(CreateRoomRequest::new(), {
4006                invite: vec![],
4007                is_direct: false,
4008            }))
4009            .await
4010            .unwrap();
4011
4012        // Room is locally present, but not synced, the call will timeout
4013        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4014            .await
4015            .unwrap_err();
4016    }
4017
4018    #[async_test]
4019    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4020        let server = MatrixMockServer::new().await;
4021        let client = server.client_builder().build().await;
4022
4023        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4024
4025        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4026        assert_matches!(ret, Ok(true));
4027    }
4028
4029    #[async_test]
4030    async fn test_is_room_alias_available_if_alias_is_resolved() {
4031        let server = MatrixMockServer::new().await;
4032        let client = server.client_builder().build().await;
4033
4034        server
4035            .mock_room_directory_resolve_alias()
4036            .ok("!some_room_id:matrix.org", Vec::new())
4037            .expect(1)
4038            .mount()
4039            .await;
4040
4041        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4042        assert_matches!(ret, Ok(false));
4043    }
4044
4045    #[async_test]
4046    async fn test_is_room_alias_available_if_error_found() {
4047        let server = MatrixMockServer::new().await;
4048        let client = server.client_builder().build().await;
4049
4050        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4051
4052        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4053        assert_matches!(ret, Err(_));
4054    }
4055
4056    #[async_test]
4057    async fn test_create_room_alias() {
4058        let server = MatrixMockServer::new().await;
4059        let client = server.client_builder().build().await;
4060
4061        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4062
4063        let ret = client
4064            .create_room_alias(
4065                room_alias_id!("#some_alias:matrix.org"),
4066                room_id!("!some_room:matrix.org"),
4067            )
4068            .await;
4069        assert_matches!(ret, Ok(()));
4070    }
4071
4072    #[async_test]
4073    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4074        let server = MatrixMockServer::new().await;
4075        let client = server.client_builder().build().await;
4076
4077        let room_id = room_id!("!a-room:matrix.org");
4078
4079        // Make sure the summary endpoint is called once
4080        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4081
4082        // We create a locally cached invited room
4083        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4084
4085        // And we get a preview, the server endpoint was reached
4086        let preview = client
4087            .get_room_preview(room_id.into(), Vec::new())
4088            .await
4089            .expect("Room preview should be retrieved");
4090
4091        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
4092    }
4093
4094    #[async_test]
4095    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4096        let server = MatrixMockServer::new().await;
4097        let client = server.client_builder().build().await;
4098
4099        let room_id = room_id!("!a-room:matrix.org");
4100
4101        // Make sure the summary endpoint is called once
4102        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4103
4104        // We create a locally cached left room
4105        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4106
4107        // And we get a preview, the server endpoint was reached
4108        let preview = client
4109            .get_room_preview(room_id.into(), Vec::new())
4110            .await
4111            .expect("Room preview should be retrieved");
4112
4113        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
4114    }
4115
4116    #[async_test]
4117    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4118        let server = MatrixMockServer::new().await;
4119        let client = server.client_builder().build().await;
4120
4121        let room_id = room_id!("!a-room:matrix.org");
4122
4123        // Make sure the summary endpoint is called once
4124        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4125
4126        // We create a locally cached knocked room
4127        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4128
4129        // And we get a preview, the server endpoint was reached
4130        let preview = client
4131            .get_room_preview(room_id.into(), Vec::new())
4132            .await
4133            .expect("Room preview should be retrieved");
4134
4135        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
4136    }
4137
4138    #[async_test]
4139    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4140        let server = MatrixMockServer::new().await;
4141        let client = server.client_builder().build().await;
4142
4143        let room_id = room_id!("!a-room:matrix.org");
4144
4145        // Make sure the summary endpoint is not called
4146        server.mock_room_summary().ok(room_id).never().mount().await;
4147
4148        // We create a locally cached joined room
4149        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4150
4151        // And we get a preview, no server endpoint was reached
4152        let preview = client
4153            .get_room_preview(room_id.into(), Vec::new())
4154            .await
4155            .expect("Room preview should be retrieved");
4156
4157        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
4158    }
4159
4160    #[async_test]
4161    async fn test_media_preview_config() {
4162        let server = MatrixMockServer::new().await;
4163        let client = server.client_builder().build().await;
4164
4165        server
4166            .mock_sync()
4167            .ok_and_run(&client, |builder| {
4168                builder.add_custom_global_account_data(json!({
4169                    "content": {
4170                        "media_previews": "private",
4171                        "invite_avatars": "off"
4172                    },
4173                    "type": "m.media_preview_config"
4174                }));
4175            })
4176            .await;
4177
4178        let (initial_value, stream) =
4179            client.account().observe_media_preview_config().await.unwrap();
4180
4181        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4182        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4183        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4184        pin_mut!(stream);
4185        assert_pending!(stream);
4186
4187        server
4188            .mock_sync()
4189            .ok_and_run(&client, |builder| {
4190                builder.add_custom_global_account_data(json!({
4191                    "content": {
4192                        "media_previews": "off",
4193                        "invite_avatars": "on"
4194                    },
4195                    "type": "m.media_preview_config"
4196                }));
4197            })
4198            .await;
4199
4200        assert_next_matches!(
4201            stream,
4202            MediaPreviewConfigEventContent {
4203                media_previews: Some(MediaPreviews::Off),
4204                invite_avatars: Some(InviteAvatars::On),
4205                ..
4206            }
4207        );
4208        assert_pending!(stream);
4209    }
4210
4211    #[async_test]
4212    async fn test_unstable_media_preview_config() {
4213        let server = MatrixMockServer::new().await;
4214        let client = server.client_builder().build().await;
4215
4216        server
4217            .mock_sync()
4218            .ok_and_run(&client, |builder| {
4219                builder.add_custom_global_account_data(json!({
4220                    "content": {
4221                        "media_previews": "private",
4222                        "invite_avatars": "off"
4223                    },
4224                    "type": "io.element.msc4278.media_preview_config"
4225                }));
4226            })
4227            .await;
4228
4229        let (initial_value, stream) =
4230            client.account().observe_media_preview_config().await.unwrap();
4231
4232        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4233        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4234        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4235        pin_mut!(stream);
4236        assert_pending!(stream);
4237
4238        server
4239            .mock_sync()
4240            .ok_and_run(&client, |builder| {
4241                builder.add_custom_global_account_data(json!({
4242                    "content": {
4243                        "media_previews": "off",
4244                        "invite_avatars": "on"
4245                    },
4246                    "type": "io.element.msc4278.media_preview_config"
4247                }));
4248            })
4249            .await;
4250
4251        assert_next_matches!(
4252            stream,
4253            MediaPreviewConfigEventContent {
4254                media_previews: Some(MediaPreviews::Off),
4255                invite_avatars: Some(InviteAvatars::On),
4256                ..
4257            }
4258        );
4259        assert_pending!(stream);
4260    }
4261
4262    #[async_test]
4263    async fn test_media_preview_config_not_found() {
4264        let server = MatrixMockServer::new().await;
4265        let client = server.client_builder().build().await;
4266
4267        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4268
4269        assert!(initial_value.is_none());
4270    }
4271
4272    #[async_test]
4273    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4274        // The default Matrix version we use is 1.11 or higher, so authenticated media
4275        // is supported.
4276        let server = MatrixMockServer::new().await;
4277        let client = server.client_builder().build().await;
4278
4279        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4280
4281        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4282        client.load_or_fetch_max_upload_size().await.unwrap();
4283
4284        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4285    }
4286
4287    #[async_test]
4288    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4289        // The server must advertise support for the stable feature for authenticated
4290        // media support, so we mock the `GET /versions` response.
4291        let server = MatrixMockServer::new().await;
4292        let client = server.client_builder().no_server_versions().build().await;
4293
4294        server
4295            .mock_versions()
4296            .ok_custom(
4297                &["v1.7", "v1.8", "v1.9", "v1.10"],
4298                &[("org.matrix.msc3916.stable", true)].into(),
4299            )
4300            .named("versions")
4301            .expect(1)
4302            .mount()
4303            .await;
4304
4305        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4306
4307        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4308        client.load_or_fetch_max_upload_size().await.unwrap();
4309
4310        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4311    }
4312
4313    #[async_test]
4314    async fn test_load_or_fetch_max_upload_size_no_auth() {
4315        // The server must not support Matrix 1.11 or higher for unauthenticated
4316        // media requests, so we mock the `GET /versions` response.
4317        let server = MatrixMockServer::new().await;
4318        let client = server.client_builder().no_server_versions().build().await;
4319
4320        server
4321            .mock_versions()
4322            .ok_custom(&["v1.1"], &Default::default())
4323            .named("versions")
4324            .expect(1)
4325            .mount()
4326            .await;
4327
4328        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4329
4330        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4331        client.load_or_fetch_max_upload_size().await.unwrap();
4332
4333        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4334    }
4335
4336    #[async_test]
4337    async fn test_uploading_a_too_large_media_file() {
4338        let server = MatrixMockServer::new().await;
4339        let client = server.client_builder().build().await;
4340
4341        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4342        client.load_or_fetch_max_upload_size().await.unwrap();
4343        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4344
4345        let data = vec![1, 2];
4346        let upload_request =
4347            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4348        let request = SendRequest {
4349            client: client.clone(),
4350            request: upload_request,
4351            config: None,
4352            send_progress: SharedObservable::new(TransmissionProgress::default()),
4353        };
4354        let media_request = SendMediaUploadRequest::new(request);
4355
4356        let error = media_request.await.err();
4357        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4358        assert_eq!(max, uint!(1));
4359        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4360    }
4361
4362    #[async_test]
4363    async fn test_dont_ignore_timeout_on_first_sync() {
4364        let server = MatrixMockServer::new().await;
4365        let client = server.client_builder().build().await;
4366
4367        server
4368            .mock_sync()
4369            .timeout(Some(Duration::from_secs(30)))
4370            .ok(|_| {})
4371            .mock_once()
4372            .named("sync_with_timeout")
4373            .mount()
4374            .await;
4375
4376        // Call the endpoint once to check the timeout.
4377        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4378
4379        timeout(Duration::from_secs(1), async {
4380            stream.next().await.unwrap().unwrap();
4381        })
4382        .await
4383        .unwrap();
4384    }
4385
4386    #[async_test]
4387    async fn test_ignore_timeout_on_first_sync() {
4388        let server = MatrixMockServer::new().await;
4389        let client = server.client_builder().build().await;
4390
4391        server
4392            .mock_sync()
4393            .timeout(None)
4394            .ok(|_| {})
4395            .mock_once()
4396            .named("sync_no_timeout")
4397            .mount()
4398            .await;
4399        server
4400            .mock_sync()
4401            .timeout(Some(Duration::from_secs(30)))
4402            .ok(|_| {})
4403            .mock_once()
4404            .named("sync_with_timeout")
4405            .mount()
4406            .await;
4407
4408        // Call each version of the endpoint once to check the timeouts.
4409        let mut stream = Box::pin(
4410            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4411        );
4412
4413        timeout(Duration::from_secs(1), async {
4414            stream.next().await.unwrap().unwrap();
4415            stream.next().await.unwrap().unwrap();
4416        })
4417        .await
4418        .unwrap();
4419    }
4420
4421    #[async_test]
4422    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4423        let server = MatrixMockServer::new().await;
4424        let client = server.client_builder().build().await;
4425        // This is the user ID that is inside MemberAdditional.
4426        // Note the confusing username, so we can share
4427        // GlobalAccountDataTestEvent::Direct with the invited test.
4428        let user_id = user_id!("@invited:localhost");
4429
4430        // When we receive a sync response saying "invited" is invited to a DM
4431        let f = EventFactory::new();
4432        let response = SyncResponseBuilder::default()
4433            .add_joined_room(
4434                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4435            )
4436            .add_global_account_data(
4437                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4438            )
4439            .build_sync_response();
4440        client.base_client().receive_sync_response(response).await.unwrap();
4441
4442        // Then get_dm_room finds this room
4443        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4444        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4445    }
4446
4447    #[async_test]
4448    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4449        let server = MatrixMockServer::new().await;
4450        let client = server.client_builder().build().await;
4451        // This is the user ID that is inside MemberInvite
4452        let user_id = user_id!("@invited:localhost");
4453
4454        // When we receive a sync response saying "invited" is invited to a DM
4455        let f = EventFactory::new();
4456        let response = SyncResponseBuilder::default()
4457            .add_joined_room(
4458                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4459            )
4460            .add_global_account_data(
4461                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4462            )
4463            .build_sync_response();
4464        client.base_client().receive_sync_response(response).await.unwrap();
4465
4466        // Then get_dm_room finds this room
4467        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4468        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4469    }
4470
4471    #[async_test]
4472    async fn test_get_dm_room_still_finds_left_room() {
4473        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4474        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4475
4476        let server = MatrixMockServer::new().await;
4477        let client = server.client_builder().build().await;
4478        // This is the user ID that is inside MemberAdditional.
4479        // Note the confusing username, so we can share
4480        // GlobalAccountDataTestEvent::Direct with the invited test.
4481        let user_id = user_id!("@invited:localhost");
4482
4483        // When we receive a sync response saying "invited" is invited to a DM
4484        let f = EventFactory::new();
4485        let response = SyncResponseBuilder::default()
4486            .add_joined_room(
4487                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4488            )
4489            .add_global_account_data(
4490                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4491            )
4492            .build_sync_response();
4493        client.base_client().receive_sync_response(response).await.unwrap();
4494
4495        // Then get_dm_room finds this room
4496        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4497        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4498    }
4499
4500    #[async_test]
4501    async fn test_device_exists() {
4502        let server = MatrixMockServer::new().await;
4503        let client = server.client_builder().build().await;
4504
4505        server.mock_get_device().ok().expect(1).mount().await;
4506
4507        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4508    }
4509
4510    #[async_test]
4511    async fn test_device_exists_404() {
4512        let server = MatrixMockServer::new().await;
4513        let client = server.client_builder().build().await;
4514
4515        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4516    }
4517
4518    #[async_test]
4519    async fn test_device_exists_500() {
4520        let server = MatrixMockServer::new().await;
4521        let client = server.client_builder().build().await;
4522
4523        server.mock_get_device().error500().expect(1).mount().await;
4524
4525        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4526    }
4527}