matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{btree_map, BTreeMap, BTreeSet},
19    fmt::{self, Debug},
20    future::{ready, Future},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
33use matrix_sdk_base::{
34    event_cache::store::EventCacheStoreLock,
35    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
36    sync::{Notification, RoomUpdates},
37    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
38    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
39};
40use matrix_sdk_common::ttl_cache::TtlCache;
41#[cfg(feature = "e2e-encryption")]
42use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
43use ruma::{
44    api::{
45        client::{
46            account::whoami,
47            alias::{create_alias, delete_alias, get_alias},
48            authenticated_media,
49            device::{delete_devices, get_devices, update_device},
50            directory::{get_public_rooms, get_public_rooms_filtered},
51            discovery::{
52                discover_homeserver::{self, RtcFocusInfo},
53                get_capabilities::{self, v3::Capabilities},
54                get_supported_versions,
55            },
56            error::ErrorKind,
57            filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
58            knock::knock_room,
59            media,
60            membership::{join_room_by_id, join_room_by_id_or_alias},
61            room::create_room,
62            session::login::v3::DiscoveryInfo,
63            sync::sync_events,
64            threads::get_thread_subscriptions_changes,
65            uiaa,
66            user_directory::search_users,
67        },
68        error::FromHttpResponseError,
69        federation::discovery::get_server_version,
70        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
71    },
72    assign,
73    push::Ruleset,
74    time::Instant,
75    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
76    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
77};
78use serde::de::DeserializeOwned;
79use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
80use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
81use url::Url;
82
83use self::futures::SendRequest;
84use crate::{
85    authentication::{
86        matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
87        SaveSessionCallback,
88    },
89    client::thread_subscriptions::ThreadSubscriptionCatchup,
90    config::{RequestConfig, SyncToken},
91    deduplicating_handler::DeduplicatingHandler,
92    error::HttpResult,
93    event_cache::EventCache,
94    event_handler::{
95        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
96        EventHandlerStore, ObservableEventHandler, SyncEvent,
97    },
98    http_client::HttpClient,
99    latest_events::LatestEvents,
100    media::MediaError,
101    notification_settings::NotificationSettings,
102    room::RoomMember,
103    room_preview::RoomPreview,
104    send_queue::{SendQueue, SendQueueData},
105    sliding_sync::Version as SlidingSyncVersion,
106    sync::{RoomUpdate, SyncResponse},
107    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
108    Room, SessionTokens, TransmissionProgress,
109};
110#[cfg(feature = "e2e-encryption")]
111use crate::{
112    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
113    store_locks::CrossProcessStoreLock,
114};
115
116mod builder;
117pub(crate) mod caches;
118pub(crate) mod futures;
119#[cfg(feature = "experimental-search")]
120pub(crate) mod search;
121pub(crate) mod thread_subscriptions;
122
123pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
124#[cfg(feature = "experimental-search")]
125use crate::client::search::SearchIndex;
126
127#[cfg(not(target_family = "wasm"))]
128type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
129#[cfg(target_family = "wasm")]
130type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
131
132#[cfg(not(target_family = "wasm"))]
133type NotificationHandlerFn =
134    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
135#[cfg(target_family = "wasm")]
136type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
137
138/// Enum controlling if a loop running callbacks should continue or abort.
139///
140/// This is mainly used in the [`sync_with_callback`] method, the return value
141/// of the provided callback controls if the sync loop should be exited.
142///
143/// [`sync_with_callback`]: #method.sync_with_callback
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum LoopCtrl {
146    /// Continue running the loop.
147    Continue,
148    /// Break out of the loop.
149    Break,
150}
151
152/// Represents changes that can occur to a `Client`s `Session`.
153#[derive(Debug, Clone, PartialEq)]
154pub enum SessionChange {
155    /// The session's token is no longer valid.
156    UnknownToken {
157        /// Whether or not the session was soft logged out
158        soft_logout: bool,
159    },
160    /// The session's tokens have been refreshed.
161    TokensRefreshed,
162}
163
164/// Information about the server vendor obtained from the federation API.
165#[derive(Debug, Clone, PartialEq, Eq)]
166#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
167pub struct ServerVendorInfo {
168    /// The server name.
169    pub server_name: String,
170    /// The server version.
171    pub version: String,
172}
173
174/// An async/await enabled Matrix client.
175///
176/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
177#[derive(Clone)]
178pub struct Client {
179    pub(crate) inner: Arc<ClientInner>,
180}
181
182#[derive(Default)]
183pub(crate) struct ClientLocks {
184    /// Lock ensuring that only a single room may be marked as a DM at once.
185    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
186    /// explanation.
187    pub(crate) mark_as_dm_lock: Mutex<()>,
188
189    /// Lock ensuring that only a single secret store is getting opened at the
190    /// same time.
191    ///
192    /// This is important so we don't accidentally create multiple different new
193    /// default secret storage keys.
194    #[cfg(feature = "e2e-encryption")]
195    pub(crate) open_secret_store_lock: Mutex<()>,
196
197    /// Lock ensuring that we're only storing a single secret at a time.
198    ///
199    /// Take a look at the [`SecretStore::put_secret`] method for a more
200    /// detailed explanation.
201    ///
202    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
203    #[cfg(feature = "e2e-encryption")]
204    pub(crate) store_secret_lock: Mutex<()>,
205
206    /// Lock ensuring that only one method at a time might modify our backup.
207    #[cfg(feature = "e2e-encryption")]
208    pub(crate) backup_modify_lock: Mutex<()>,
209
210    /// Lock ensuring that we're going to attempt to upload backups for a single
211    /// requester.
212    #[cfg(feature = "e2e-encryption")]
213    pub(crate) backup_upload_lock: Mutex<()>,
214
215    /// Handler making sure we only have one group session sharing request in
216    /// flight per room.
217    #[cfg(feature = "e2e-encryption")]
218    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
219
220    /// Lock making sure we're only doing one key claim request at a time.
221    #[cfg(feature = "e2e-encryption")]
222    pub(crate) key_claim_lock: Mutex<()>,
223
224    /// Handler to ensure that only one members request is running at a time,
225    /// given a room.
226    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
227
228    /// Handler to ensure that only one encryption state request is running at a
229    /// time, given a room.
230    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
231
232    /// Deduplicating handler for sending read receipts. The string is an
233    /// internal implementation detail, see [`Self::send_single_receipt`].
234    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
235
236    #[cfg(feature = "e2e-encryption")]
237    pub(crate) cross_process_crypto_store_lock:
238        OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
239
240    /// Latest "generation" of data known by the crypto store.
241    ///
242    /// This is a counter that only increments, set in the database (and can
243    /// wrap). It's incremented whenever some process acquires a lock for the
244    /// first time. *This assumes the crypto store lock is being held, to
245    /// avoid data races on writing to this value in the store*.
246    ///
247    /// The current process will maintain this value in local memory and in the
248    /// DB over time. Observing a different value than the one read in
249    /// memory, when reading from the store indicates that somebody else has
250    /// written into the database under our feet.
251    ///
252    /// TODO: this should live in the `OlmMachine`, since it's information
253    /// related to the lock. As of today (2023-07-28), we blow up the entire
254    /// olm machine when there's a generation mismatch. So storing the
255    /// generation in the olm machine would make the client think there's
256    /// *always* a mismatch, and that's why we need to store the generation
257    /// outside the `OlmMachine`.
258    #[cfg(feature = "e2e-encryption")]
259    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
260}
261
262pub(crate) struct ClientInner {
263    /// All the data related to authentication and authorization.
264    pub(crate) auth_ctx: Arc<AuthCtx>,
265
266    /// The URL of the server.
267    ///
268    /// Not to be confused with the `Self::homeserver`. `server` is usually
269    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
270    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
271    /// homeserver (at the time of writing — 2024-08-28).
272    ///
273    /// This value is optional depending on how the `Client` has been built.
274    /// If it's been built from a homeserver URL directly, we don't know the
275    /// server. However, if the `Client` has been built from a server URL or
276    /// name, then the homeserver has been discovered, and we know both.
277    server: Option<Url>,
278
279    /// The URL of the homeserver to connect to.
280    ///
281    /// This is the URL for the client-server Matrix API.
282    homeserver: StdRwLock<Url>,
283
284    /// The sliding sync version.
285    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
286
287    /// The underlying HTTP client.
288    pub(crate) http_client: HttpClient,
289
290    /// User session data.
291    pub(super) base_client: BaseClient,
292
293    /// Collection of in-memory caches for the [`Client`].
294    pub(crate) caches: ClientCaches,
295
296    /// Collection of locks individual client methods might want to use, either
297    /// to ensure that only a single call to a method happens at once or to
298    /// deduplicate multiple calls to a method.
299    pub(crate) locks: ClientLocks,
300
301    /// The cross-process store locks holder name.
302    ///
303    /// The SDK provides cross-process store locks (see
304    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
305    /// `holder_name` is the value used for all cross-process store locks
306    /// used by this `Client`.
307    ///
308    /// If multiple `Client`s are running in different processes, this
309    /// value MUST be different for each `Client`.
310    cross_process_store_locks_holder_name: String,
311
312    /// A mapping of the times at which the current user sent typing notices,
313    /// keyed by room.
314    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
315
316    /// Event handlers. See `add_event_handler`.
317    pub(crate) event_handlers: EventHandlerStore,
318
319    /// Notification handlers. See `register_notification_handler`.
320    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
321
322    /// The sender-side of channels used to receive room updates.
323    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
324
325    /// The sender-side of a channel used to observe all the room updates of a
326    /// sync response.
327    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
328
329    /// Whether the client should update its homeserver URL with the discovery
330    /// information present in the login response.
331    respect_login_well_known: bool,
332
333    /// An event that can be listened on to wait for a successful sync. The
334    /// event will only be fired if a sync loop is running. Can be used for
335    /// synchronization, e.g. if we send out a request to create a room, we can
336    /// wait for the sync to get the data to fetch a room object from the state
337    /// store.
338    pub(crate) sync_beat: event_listener::Event,
339
340    /// A central cache for events, inactive first.
341    ///
342    /// It becomes active when [`EventCache::subscribe`] is called.
343    pub(crate) event_cache: OnceCell<EventCache>,
344
345    /// End-to-end encryption related state.
346    #[cfg(feature = "e2e-encryption")]
347    pub(crate) e2ee: EncryptionData,
348
349    /// The verification state of our own device.
350    #[cfg(feature = "e2e-encryption")]
351    pub(crate) verification_state: SharedObservable<VerificationState>,
352
353    /// Whether to enable the experimental support for sending and receiving
354    /// encrypted room history on invite, per [MSC4268].
355    ///
356    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
357    #[cfg(feature = "e2e-encryption")]
358    pub(crate) enable_share_history_on_invite: bool,
359
360    /// Data related to the [`SendQueue`].
361    ///
362    /// [`SendQueue`]: crate::send_queue::SendQueue
363    pub(crate) send_queue_data: Arc<SendQueueData>,
364
365    /// The `max_upload_size` value of the homeserver, it contains the max
366    /// request size you can send.
367    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
368
369    /// The entry point to get the [`LatestEvent`] of rooms and threads.
370    ///
371    /// [`LatestEvent`]: crate::latest_event::LatestEvent
372    latest_events: OnceCell<LatestEvents>,
373
374    /// Service handling the catching up of thread subscriptions in the
375    /// background.
376    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
377
378    #[cfg(feature = "experimental-search")]
379    /// Handler for [`RoomIndex`]'s of each room
380    search_index: SearchIndex,
381}
382
383impl ClientInner {
384    /// Create a new `ClientInner`.
385    ///
386    /// All the fields passed as parameters here are those that must be cloned
387    /// upon instantiation of a sub-client, e.g. a client specialized for
388    /// notifications.
389    #[allow(clippy::too_many_arguments)]
390    async fn new(
391        auth_ctx: Arc<AuthCtx>,
392        server: Option<Url>,
393        homeserver: Url,
394        sliding_sync_version: SlidingSyncVersion,
395        http_client: HttpClient,
396        base_client: BaseClient,
397        server_info: ClientServerInfo,
398        respect_login_well_known: bool,
399        event_cache: OnceCell<EventCache>,
400        send_queue: Arc<SendQueueData>,
401        latest_events: OnceCell<LatestEvents>,
402        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
403        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
404        cross_process_store_locks_holder_name: String,
405        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
406        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
407    ) -> Arc<Self> {
408        let caches = ClientCaches {
409            server_info: server_info.into(),
410            server_metadata: Mutex::new(TtlCache::new()),
411        };
412
413        let client = Self {
414            server,
415            homeserver: StdRwLock::new(homeserver),
416            auth_ctx,
417            sliding_sync_version: StdRwLock::new(sliding_sync_version),
418            http_client,
419            base_client,
420            caches,
421            locks: Default::default(),
422            cross_process_store_locks_holder_name,
423            typing_notice_times: Default::default(),
424            event_handlers: Default::default(),
425            notification_handlers: Default::default(),
426            room_update_channels: Default::default(),
427            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
428            // ballast for all observers to catch up.
429            room_updates_sender: broadcast::Sender::new(32),
430            respect_login_well_known,
431            sync_beat: event_listener::Event::new(),
432            event_cache,
433            send_queue_data: send_queue,
434            latest_events,
435            #[cfg(feature = "e2e-encryption")]
436            e2ee: EncryptionData::new(encryption_settings),
437            #[cfg(feature = "e2e-encryption")]
438            verification_state: SharedObservable::new(VerificationState::Unknown),
439            #[cfg(feature = "e2e-encryption")]
440            enable_share_history_on_invite,
441            server_max_upload_size: Mutex::new(OnceCell::new()),
442            #[cfg(feature = "experimental-search")]
443            search_index: search_index_handler,
444            thread_subscription_catchup,
445        };
446
447        #[allow(clippy::let_and_return)]
448        let client = Arc::new(client);
449
450        #[cfg(feature = "e2e-encryption")]
451        client.e2ee.initialize_tasks(&client);
452
453        let _ = client
454            .event_cache
455            .get_or_init(|| async {
456                EventCache::new(
457                    WeakClient::from_inner(&client),
458                    client.base_client.event_cache_store().clone(),
459                )
460            })
461            .await;
462
463        let _ = client
464            .thread_subscription_catchup
465            .get_or_init(|| async {
466                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
467            })
468            .await;
469
470        client
471    }
472}
473
474#[cfg(not(tarpaulin_include))]
475impl Debug for Client {
476    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
477        write!(fmt, "Client")
478    }
479}
480
481impl Client {
482    /// Create a new [`Client`] that will use the given homeserver.
483    ///
484    /// # Arguments
485    ///
486    /// * `homeserver_url` - The homeserver that the client should connect to.
487    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
488        Self::builder().homeserver_url(homeserver_url).build().await
489    }
490
491    /// Returns a subscriber that publishes an event every time the ignore user
492    /// list changes.
493    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
494        self.inner.base_client.subscribe_to_ignore_user_list_changes()
495    }
496
497    /// Create a new [`ClientBuilder`].
498    pub fn builder() -> ClientBuilder {
499        ClientBuilder::new()
500    }
501
502    pub(crate) fn base_client(&self) -> &BaseClient {
503        &self.inner.base_client
504    }
505
506    /// The underlying HTTP client.
507    pub fn http_client(&self) -> &reqwest::Client {
508        &self.inner.http_client.inner
509    }
510
511    pub(crate) fn locks(&self) -> &ClientLocks {
512        &self.inner.locks
513    }
514
515    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
516        &self.inner.auth_ctx
517    }
518
519    /// The cross-process store locks holder name.
520    ///
521    /// The SDK provides cross-process store locks (see
522    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
523    /// `holder_name` is the value used for all cross-process store locks
524    /// used by this `Client`.
525    pub fn cross_process_store_locks_holder_name(&self) -> &str {
526        &self.inner.cross_process_store_locks_holder_name
527    }
528
529    /// Change the homeserver URL used by this client.
530    ///
531    /// # Arguments
532    ///
533    /// * `homeserver_url` - The new URL to use.
534    fn set_homeserver(&self, homeserver_url: Url) {
535        *self.inner.homeserver.write().unwrap() = homeserver_url;
536    }
537
538    /// Get the capabilities of the homeserver.
539    ///
540    /// This method should be used to check what features are supported by the
541    /// homeserver.
542    ///
543    /// # Examples
544    ///
545    /// ```no_run
546    /// # use matrix_sdk::Client;
547    /// # use url::Url;
548    /// # async {
549    /// # let homeserver = Url::parse("http://example.com")?;
550    /// let client = Client::new(homeserver).await?;
551    ///
552    /// let capabilities = client.get_capabilities().await?;
553    ///
554    /// if capabilities.change_password.enabled {
555    ///     // Change password
556    /// }
557    /// # anyhow::Ok(()) };
558    /// ```
559    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
560        let res = self.send(get_capabilities::v3::Request::new()).await?;
561        Ok(res.capabilities)
562    }
563
564    /// Get the server vendor information from the federation API.
565    ///
566    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
567    /// both the server's software name and version.
568    ///
569    /// # Examples
570    ///
571    /// ```no_run
572    /// # use matrix_sdk::Client;
573    /// # use url::Url;
574    /// # async {
575    /// # let homeserver = Url::parse("http://example.com")?;
576    /// let client = Client::new(homeserver).await?;
577    ///
578    /// let server_info = client.server_vendor_info(None).await?;
579    /// println!(
580    ///     "Server: {}, Version: {}",
581    ///     server_info.server_name, server_info.version
582    /// );
583    /// # anyhow::Ok(()) };
584    /// ```
585    pub async fn server_vendor_info(
586        &self,
587        request_config: Option<RequestConfig>,
588    ) -> HttpResult<ServerVendorInfo> {
589        let res = self
590            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
591            .await?;
592
593        // Extract server info, using defaults if fields are missing.
594        let server = res.server.unwrap_or_default();
595        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
596        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
597
598        Ok(ServerVendorInfo { server_name: server_name_str, version })
599    }
600
601    /// Get a copy of the default request config.
602    ///
603    /// The default request config is what's used when sending requests if no
604    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
605    /// function with such a parameter.
606    ///
607    /// If the default request config was not customized through
608    /// [`ClientBuilder`] when creating this `Client`, the returned value will
609    /// be equivalent to [`RequestConfig::default()`].
610    pub fn request_config(&self) -> RequestConfig {
611        self.inner.http_client.request_config
612    }
613
614    /// Check whether the client has been activated.
615    ///
616    /// A client is considered active when:
617    ///
618    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
619    ///    is logged in,
620    /// 2. Has loaded cached data from storage,
621    /// 3. If encryption is enabled, it also initialized or restored its
622    ///    `OlmMachine`.
623    pub fn is_active(&self) -> bool {
624        self.inner.base_client.is_active()
625    }
626
627    /// The server used by the client.
628    ///
629    /// See `Self::server` to learn more.
630    pub fn server(&self) -> Option<&Url> {
631        self.inner.server.as_ref()
632    }
633
634    /// The homeserver of the client.
635    pub fn homeserver(&self) -> Url {
636        self.inner.homeserver.read().unwrap().clone()
637    }
638
639    /// Get the sliding sync version.
640    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
641        self.inner.sliding_sync_version.read().unwrap().clone()
642    }
643
644    /// Override the sliding sync version.
645    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
646        let mut lock = self.inner.sliding_sync_version.write().unwrap();
647        *lock = version;
648    }
649
650    /// Get the Matrix user session meta information.
651    ///
652    /// If the client is currently logged in, this will return a
653    /// [`SessionMeta`] object which contains the user ID and device ID.
654    /// Otherwise it returns `None`.
655    pub fn session_meta(&self) -> Option<&SessionMeta> {
656        self.base_client().session_meta()
657    }
658
659    /// Returns a receiver that gets events for each room info update. To watch
660    /// for new events, use `receiver.resubscribe()`.
661    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
662        self.base_client().room_info_notable_update_receiver()
663    }
664
665    /// Performs a search for users.
666    /// The search is performed case-insensitively on user IDs and display names
667    ///
668    /// # Arguments
669    ///
670    /// * `search_term` - The search term for the search
671    /// * `limit` - The maximum number of results to return. Defaults to 10.
672    ///
673    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
674    pub async fn search_users(
675        &self,
676        search_term: &str,
677        limit: u64,
678    ) -> HttpResult<search_users::v3::Response> {
679        let mut request = search_users::v3::Request::new(search_term.to_owned());
680
681        if let Some(limit) = UInt::new(limit) {
682            request.limit = limit;
683        }
684
685        self.send(request).await
686    }
687
688    /// Get the user id of the current owner of the client.
689    pub fn user_id(&self) -> Option<&UserId> {
690        self.session_meta().map(|s| s.user_id.as_ref())
691    }
692
693    /// Get the device ID that identifies the current session.
694    pub fn device_id(&self) -> Option<&DeviceId> {
695        self.session_meta().map(|s| s.device_id.as_ref())
696    }
697
698    /// Get the current access token for this session.
699    ///
700    /// Will be `None` if the client has not been logged in.
701    pub fn access_token(&self) -> Option<String> {
702        self.auth_ctx().access_token()
703    }
704
705    /// Get the current tokens for this session.
706    ///
707    /// To be notified of changes in the session tokens, use
708    /// [`Client::subscribe_to_session_changes()`] or
709    /// [`Client::set_session_callbacks()`].
710    ///
711    /// Returns `None` if the client has not been logged in.
712    pub fn session_tokens(&self) -> Option<SessionTokens> {
713        self.auth_ctx().session_tokens()
714    }
715
716    /// Access the authentication API used to log in this client.
717    ///
718    /// Will be `None` if the client has not been logged in.
719    pub fn auth_api(&self) -> Option<AuthApi> {
720        match self.auth_ctx().auth_data.get()? {
721            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
722            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
723        }
724    }
725
726    /// Get the whole session info of this client.
727    ///
728    /// Will be `None` if the client has not been logged in.
729    ///
730    /// Can be used with [`Client::restore_session`] to restore a previously
731    /// logged-in session.
732    pub fn session(&self) -> Option<AuthSession> {
733        match self.auth_api()? {
734            AuthApi::Matrix(api) => api.session().map(Into::into),
735            AuthApi::OAuth(api) => api.full_session().map(Into::into),
736        }
737    }
738
739    /// Get a reference to the state store.
740    pub fn state_store(&self) -> &DynStateStore {
741        self.base_client().state_store()
742    }
743
744    /// Get a reference to the event cache store.
745    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
746        self.base_client().event_cache_store()
747    }
748
749    /// Access the native Matrix authentication API with this client.
750    pub fn matrix_auth(&self) -> MatrixAuth {
751        MatrixAuth::new(self.clone())
752    }
753
754    /// Get the account of the current owner of the client.
755    pub fn account(&self) -> Account {
756        Account::new(self.clone())
757    }
758
759    /// Get the encryption manager of the client.
760    #[cfg(feature = "e2e-encryption")]
761    pub fn encryption(&self) -> Encryption {
762        Encryption::new(self.clone())
763    }
764
765    /// Get the media manager of the client.
766    pub fn media(&self) -> Media {
767        Media::new(self.clone())
768    }
769
770    /// Get the pusher manager of the client.
771    pub fn pusher(&self) -> Pusher {
772        Pusher::new(self.clone())
773    }
774
775    /// Access the OAuth 2.0 API of the client.
776    pub fn oauth(&self) -> OAuth {
777        OAuth::new(self.clone())
778    }
779
780    /// Register a handler for a specific event type.
781    ///
782    /// The handler is a function or closure with one or more arguments. The
783    /// first argument is the event itself. All additional arguments are
784    /// "context" arguments: They have to implement [`EventHandlerContext`].
785    /// This trait is named that way because most of the types implementing it
786    /// give additional context about an event: The room it was in, its raw form
787    /// and other similar things. As two exceptions to this,
788    /// [`Client`] and [`EventHandlerHandle`] also implement the
789    /// `EventHandlerContext` trait so you don't have to clone your client
790    /// into the event handler manually and a handler can decide to remove
791    /// itself.
792    ///
793    /// Some context arguments are not universally applicable. A context
794    /// argument that isn't available for the given event type will result in
795    /// the event handler being skipped and an error being logged. The following
796    /// context argument types are only available for a subset of event types:
797    ///
798    /// * [`Room`] is only available for room-specific events, i.e. not for
799    ///   events like global account data events or presence events.
800    ///
801    /// You can provide custom context via
802    /// [`add_event_handler_context`](Client::add_event_handler_context) and
803    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
804    /// into the event handler.
805    ///
806    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
807    ///
808    /// # Examples
809    ///
810    /// ```no_run
811    /// use matrix_sdk::{
812    ///     deserialized_responses::EncryptionInfo,
813    ///     event_handler::Ctx,
814    ///     ruma::{
815    ///         events::{
816    ///             macros::EventContent,
817    ///             push_rules::PushRulesEvent,
818    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
819    ///         },
820    ///         push::Action,
821    ///         Int, MilliSecondsSinceUnixEpoch,
822    ///     },
823    ///     Client, Room,
824    /// };
825    /// use serde::{Deserialize, Serialize};
826    ///
827    /// # async fn example(client: Client) {
828    /// client.add_event_handler(
829    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
830    ///         // Common usage: Room event plus room and client.
831    ///     },
832    /// );
833    /// client.add_event_handler(
834    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
835    ///         async move {
836    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
837    ///             // unencrypted events and events that were decrypted by the SDK.
838    ///         }
839    ///     },
840    /// );
841    /// client.add_event_handler(
842    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
843    ///         async move {
844    ///             // A `Vec<Action>` parameter allows you to know which push actions
845    ///             // are applicable for an event. For example, an event with
846    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
847    ///             // in the timeline.
848    ///         }
849    ///     },
850    /// );
851    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
852    ///     // You can omit any or all arguments after the first.
853    /// });
854    ///
855    /// // Registering a temporary event handler:
856    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
857    ///     /* Event handler */
858    /// });
859    /// client.remove_event_handler(handle);
860    ///
861    /// // Registering custom event handler context:
862    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
863    /// struct MyContext {
864    ///     number: usize,
865    /// }
866    /// client.add_event_handler_context(MyContext { number: 5 });
867    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
868    ///     // Use the context
869    /// });
870    ///
871    /// // Custom events work exactly the same way, you just need to declare
872    /// // the content struct and use the EventContent derive macro on it.
873    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
874    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
875    /// struct TokenEventContent {
876    ///     token: String,
877    ///     #[serde(rename = "exp")]
878    ///     expires_at: MilliSecondsSinceUnixEpoch,
879    /// }
880    ///
881    /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
882    ///     todo!("Display the token");
883    /// });
884    ///
885    /// // Event handler closures can also capture local variables.
886    /// // Make sure they are cheap to clone though, because they will be cloned
887    /// // every time the closure is called.
888    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
889    ///
890    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
891    ///     println!("Calling the handler with identifier {data}");
892    /// });
893    /// # }
894    /// ```
895    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
896    where
897        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
898        H: EventHandler<Ev, Ctx>,
899    {
900        self.add_event_handler_impl(handler, None)
901    }
902
903    /// Register a handler for a specific room, and event type.
904    ///
905    /// This method works the same way as
906    /// [`add_event_handler`][Self::add_event_handler], except that the handler
907    /// will only be called for events in the room with the specified ID. See
908    /// that method for more details on event handler functions.
909    ///
910    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
911    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
912    /// your use case.
913    pub fn add_room_event_handler<Ev, Ctx, H>(
914        &self,
915        room_id: &RoomId,
916        handler: H,
917    ) -> EventHandlerHandle
918    where
919        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
920        H: EventHandler<Ev, Ctx>,
921    {
922        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
923    }
924
925    /// Observe a specific event type.
926    ///
927    /// `Ev` represents the kind of event that will be observed. `Ctx`
928    /// represents the context that will come with the event. It relies on the
929    /// same mechanism as [`Client::add_event_handler`]. The main difference is
930    /// that it returns an [`ObservableEventHandler`] and doesn't require a
931    /// user-defined closure. It is possible to subscribe to the
932    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
933    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
934    /// Ctx)`.
935    ///
936    /// Be careful that only the most recent value can be observed. Subscribers
937    /// are notified when a new value is sent, but there is no guarantee
938    /// that they will see all values.
939    ///
940    /// # Example
941    ///
942    /// Let's see a classical usage:
943    ///
944    /// ```
945    /// use futures_util::StreamExt as _;
946    /// use matrix_sdk::{
947    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
948    ///     Client, Room,
949    /// };
950    ///
951    /// # async fn example(client: Client) -> Option<()> {
952    /// let observer =
953    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
954    ///
955    /// let mut subscriber = observer.subscribe();
956    ///
957    /// let (event, (room, push_actions)) = subscriber.next().await?;
958    /// # Some(())
959    /// # }
960    /// ```
961    ///
962    /// Now let's see how to get several contexts that can be useful for you:
963    ///
964    /// ```
965    /// use matrix_sdk::{
966    ///     deserialized_responses::EncryptionInfo,
967    ///     ruma::{
968    ///         events::room::{
969    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
970    ///         },
971    ///         push::Action,
972    ///     },
973    ///     Client, Room,
974    /// };
975    ///
976    /// # async fn example(client: Client) {
977    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
978    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
979    ///
980    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
981    /// // to distinguish between unencrypted events and events that were decrypted
982    /// // by the SDK.
983    /// let _ = client
984    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
985    ///     );
986    ///
987    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
988    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
989    /// // should be highlighted in the timeline.
990    /// let _ =
991    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992    ///
993    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
994    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
995    /// # }
996    /// ```
997    ///
998    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
999    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1000    where
1001        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1002        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1003    {
1004        self.observe_room_events_impl(None)
1005    }
1006
1007    /// Observe a specific room, and event type.
1008    ///
1009    /// This method works the same way as [`Client::observe_events`], except
1010    /// that the observability will only be applied for events in the room with
1011    /// the specified ID. See that method for more details.
1012    ///
1013    /// Be careful that only the most recent value can be observed. Subscribers
1014    /// are notified when a new value is sent, but there is no guarantee
1015    /// that they will see all values.
1016    pub fn observe_room_events<Ev, Ctx>(
1017        &self,
1018        room_id: &RoomId,
1019    ) -> ObservableEventHandler<(Ev, Ctx)>
1020    where
1021        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1022        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1023    {
1024        self.observe_room_events_impl(Some(room_id.to_owned()))
1025    }
1026
1027    /// Shared implementation for `Client::observe_events` and
1028    /// `Client::observe_room_events`.
1029    fn observe_room_events_impl<Ev, Ctx>(
1030        &self,
1031        room_id: Option<OwnedRoomId>,
1032    ) -> ObservableEventHandler<(Ev, Ctx)>
1033    where
1034        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1035        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1036    {
1037        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1038        // new value.
1039        let shared_observable = SharedObservable::new(None);
1040
1041        ObservableEventHandler::new(
1042            shared_observable.clone(),
1043            self.event_handler_drop_guard(self.add_event_handler_impl(
1044                move |event: Ev, context: Ctx| {
1045                    shared_observable.set(Some((event, context)));
1046
1047                    ready(())
1048                },
1049                room_id,
1050            )),
1051        )
1052    }
1053
1054    /// Remove the event handler associated with the handle.
1055    ///
1056    /// Note that you **must not** call `remove_event_handler` from the
1057    /// non-async part of an event handler, that is:
1058    ///
1059    /// ```ignore
1060    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1061    ///     // âš  this will cause a deadlock âš 
1062    ///     client.remove_event_handler(handle);
1063    ///
1064    ///     async move {
1065    ///         // removing the event handler here is fine
1066    ///         client.remove_event_handler(handle);
1067    ///     }
1068    /// })
1069    /// ```
1070    ///
1071    /// Note also that handlers that remove themselves will still execute with
1072    /// events received in the same sync cycle.
1073    ///
1074    /// # Arguments
1075    ///
1076    /// `handle` - The [`EventHandlerHandle`] that is returned when
1077    /// registering the event handler with [`Client::add_event_handler`].
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```no_run
1082    /// # use url::Url;
1083    /// # use tokio::sync::mpsc;
1084    /// #
1085    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1086    /// #
1087    /// use matrix_sdk::{
1088    ///     event_handler::EventHandlerHandle,
1089    ///     ruma::events::room::member::SyncRoomMemberEvent, Client,
1090    /// };
1091    /// #
1092    /// # futures_executor::block_on(async {
1093    /// # let client = matrix_sdk::Client::builder()
1094    /// #     .homeserver_url(homeserver)
1095    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1096    /// #     .build()
1097    /// #     .await
1098    /// #     .unwrap();
1099    ///
1100    /// client.add_event_handler(
1101    ///     |ev: SyncRoomMemberEvent,
1102    ///      client: Client,
1103    ///      handle: EventHandlerHandle| async move {
1104    ///         // Common usage: Check arriving Event is the expected one
1105    ///         println!("Expected RoomMemberEvent received!");
1106    ///         client.remove_event_handler(handle);
1107    ///     },
1108    /// );
1109    /// # });
1110    /// ```
1111    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1112        self.inner.event_handlers.remove(handle);
1113    }
1114
1115    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1116    /// the given handle.
1117    ///
1118    /// When the returned value is dropped, the event handler will be removed.
1119    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1120        EventHandlerDropGuard::new(handle, self.clone())
1121    }
1122
1123    /// Add an arbitrary value for use as event handler context.
1124    ///
1125    /// The value can be obtained in an event handler by adding an argument of
1126    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1127    ///
1128    /// If a value of the same type has been added before, it will be
1129    /// overwritten.
1130    ///
1131    /// # Examples
1132    ///
1133    /// ```no_run
1134    /// use matrix_sdk::{
1135    ///     event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1136    ///     Room,
1137    /// };
1138    /// # #[derive(Clone)]
1139    /// # struct SomeType;
1140    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1141    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1142    /// # futures_executor::block_on(async {
1143    /// # let client = matrix_sdk::Client::builder()
1144    /// #     .homeserver_url(homeserver)
1145    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1146    /// #     .build()
1147    /// #     .await
1148    /// #     .unwrap();
1149    ///
1150    /// // Handle used to send messages to the UI part of the app
1151    /// let my_gui_handle: SomeType = obtain_gui_handle();
1152    ///
1153    /// client.add_event_handler_context(my_gui_handle.clone());
1154    /// client.add_event_handler(
1155    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1156    ///         async move {
1157    ///             // gui_handle.send(DisplayMessage { message: ev });
1158    ///         }
1159    ///     },
1160    /// );
1161    /// # });
1162    /// ```
1163    pub fn add_event_handler_context<T>(&self, ctx: T)
1164    where
1165        T: Clone + Send + Sync + 'static,
1166    {
1167        self.inner.event_handlers.add_context(ctx);
1168    }
1169
1170    /// Register a handler for a notification.
1171    ///
1172    /// Similar to [`Client::add_event_handler`], but only allows functions
1173    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1174    /// [`Client`] for now.
1175    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1176    where
1177        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1178        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1179    {
1180        self.inner.notification_handlers.write().await.push(Box::new(
1181            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1182        ));
1183
1184        self
1185    }
1186
1187    /// Subscribe to all updates for the room with the given ID.
1188    ///
1189    /// The returned receiver will receive a new message for each sync response
1190    /// that contains updates for that room.
1191    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1192        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1193            btree_map::Entry::Vacant(entry) => {
1194                let (tx, rx) = broadcast::channel(8);
1195                entry.insert(tx);
1196                rx
1197            }
1198            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1199        }
1200    }
1201
1202    /// Subscribe to all updates to all rooms, whenever any has been received in
1203    /// a sync response.
1204    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1205        self.inner.room_updates_sender.subscribe()
1206    }
1207
1208    pub(crate) async fn notification_handlers(
1209        &self,
1210    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1211        self.inner.notification_handlers.read().await
1212    }
1213
1214    /// Get all the rooms the client knows about.
1215    ///
1216    /// This will return the list of joined, invited, and left rooms.
1217    pub fn rooms(&self) -> Vec<Room> {
1218        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1219    }
1220
1221    /// Get all the rooms the client knows about, filtered by room state.
1222    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1223        self.base_client()
1224            .rooms_filtered(filter)
1225            .into_iter()
1226            .map(|room| Room::new(self.clone(), room))
1227            .collect()
1228    }
1229
1230    /// Get a stream of all the rooms, in addition to the existing rooms.
1231    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1232        let (rooms, stream) = self.base_client().rooms_stream();
1233
1234        let map_room = |room| Room::new(self.clone(), room);
1235
1236        (
1237            rooms.into_iter().map(map_room).collect(),
1238            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1239        )
1240    }
1241
1242    /// Returns the joined rooms this client knows about.
1243    pub fn joined_rooms(&self) -> Vec<Room> {
1244        self.rooms_filtered(RoomStateFilter::JOINED)
1245    }
1246
1247    /// Returns the invited rooms this client knows about.
1248    pub fn invited_rooms(&self) -> Vec<Room> {
1249        self.rooms_filtered(RoomStateFilter::INVITED)
1250    }
1251
1252    /// Returns the left rooms this client knows about.
1253    pub fn left_rooms(&self) -> Vec<Room> {
1254        self.rooms_filtered(RoomStateFilter::LEFT)
1255    }
1256
1257    /// Returns the joined space rooms this client knows about.
1258    pub fn joined_space_rooms(&self) -> Vec<Room> {
1259        self.base_client()
1260            .rooms_filtered(RoomStateFilter::JOINED)
1261            .into_iter()
1262            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1263            .collect()
1264    }
1265
1266    /// Get a room with the given room id.
1267    ///
1268    /// # Arguments
1269    ///
1270    /// `room_id` - The unique id of the room that should be fetched.
1271    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1272        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1273    }
1274
1275    /// Gets the preview of a room, whether the current user has joined it or
1276    /// not.
1277    pub async fn get_room_preview(
1278        &self,
1279        room_or_alias_id: &RoomOrAliasId,
1280        via: Vec<OwnedServerName>,
1281    ) -> Result<RoomPreview> {
1282        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1283            Ok(room_id) => room_id.to_owned(),
1284            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1285        };
1286
1287        if let Some(room) = self.get_room(&room_id) {
1288            // The cached data can only be trusted if the room state is joined or
1289            // banned: for invite and knock rooms, no updates will be received
1290            // for the rooms after the invite/knock action took place so we may
1291            // have very out to date data for important fields such as
1292            // `join_rule`. For left rooms, the homeserver should return the latest info.
1293            match room.state() {
1294                RoomState::Joined | RoomState::Banned => {
1295                    return Ok(RoomPreview::from_known_room(&room).await);
1296                }
1297                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1298            }
1299        }
1300
1301        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1302    }
1303
1304    /// Resolve a room alias to a room id and a list of servers which know
1305    /// about it.
1306    ///
1307    /// # Arguments
1308    ///
1309    /// `room_alias` - The room alias to be resolved.
1310    pub async fn resolve_room_alias(
1311        &self,
1312        room_alias: &RoomAliasId,
1313    ) -> HttpResult<get_alias::v3::Response> {
1314        let request = get_alias::v3::Request::new(room_alias.to_owned());
1315        self.send(request).await
1316    }
1317
1318    /// Checks if a room alias is not in use yet.
1319    ///
1320    /// Returns:
1321    /// - `Ok(true)` if the room alias is available.
1322    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1323    ///   status code).
1324    /// - An `Err` otherwise.
1325    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1326        match self.resolve_room_alias(alias).await {
1327            // The room alias was resolved, so it's already in use.
1328            Ok(_) => Ok(false),
1329            Err(error) => {
1330                match error.client_api_error_kind() {
1331                    // The room alias wasn't found, so it's available.
1332                    Some(ErrorKind::NotFound) => Ok(true),
1333                    _ => Err(error),
1334                }
1335            }
1336        }
1337    }
1338
1339    /// Adds a new room alias associated with a room to the room directory.
1340    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1341        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1342        self.send(request).await?;
1343        Ok(())
1344    }
1345
1346    /// Removes a room alias from the room directory.
1347    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1348        let request = delete_alias::v3::Request::new(alias.to_owned());
1349        self.send(request).await?;
1350        Ok(())
1351    }
1352
1353    /// Update the homeserver from the login response well-known if needed.
1354    ///
1355    /// # Arguments
1356    ///
1357    /// * `login_well_known` - The `well_known` field from a successful login
1358    ///   response.
1359    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1360        if self.inner.respect_login_well_known {
1361            if let Some(well_known) = login_well_known {
1362                if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1363                    self.set_homeserver(homeserver);
1364                }
1365            }
1366        }
1367    }
1368
1369    /// Similar to [`Client::restore_session_with`], with
1370    /// [`RoomLoadSettings::default()`].
1371    ///
1372    /// # Panics
1373    ///
1374    /// Panics if a session was already restored or logged in.
1375    #[instrument(skip_all)]
1376    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1377        self.restore_session_with(session, RoomLoadSettings::default()).await
1378    }
1379
1380    /// Restore a session previously logged-in using one of the available
1381    /// authentication APIs. The number of rooms to restore is controlled by
1382    /// [`RoomLoadSettings`].
1383    ///
1384    /// See the documentation of the corresponding authentication API's
1385    /// `restore_session` method for more information.
1386    ///
1387    /// # Panics
1388    ///
1389    /// Panics if a session was already restored or logged in.
1390    #[instrument(skip_all)]
1391    pub async fn restore_session_with(
1392        &self,
1393        session: impl Into<AuthSession>,
1394        room_load_settings: RoomLoadSettings,
1395    ) -> Result<()> {
1396        let session = session.into();
1397        match session {
1398            AuthSession::Matrix(session) => {
1399                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1400            }
1401            AuthSession::OAuth(session) => {
1402                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1403            }
1404        }
1405    }
1406
1407    /// Refresh the access token using the authentication API used to log into
1408    /// this session.
1409    ///
1410    /// See the documentation of the authentication API's `refresh_access_token`
1411    /// method for more information.
1412    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1413        let Some(auth_api) = self.auth_api() else {
1414            return Err(RefreshTokenError::RefreshTokenRequired);
1415        };
1416
1417        match auth_api {
1418            AuthApi::Matrix(api) => {
1419                trace!("Token refresh: Using the homeserver.");
1420                Box::pin(api.refresh_access_token()).await?;
1421            }
1422            AuthApi::OAuth(api) => {
1423                trace!("Token refresh: Using OAuth 2.0.");
1424                Box::pin(api.refresh_access_token()).await?;
1425            }
1426        }
1427
1428        Ok(())
1429    }
1430
1431    /// Log out the current session using the proper authentication API.
1432    ///
1433    /// # Errors
1434    ///
1435    /// Returns an error if the session is not authenticated or if an error
1436    /// occurred while making the request to the server.
1437    pub async fn logout(&self) -> Result<(), Error> {
1438        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1439        match auth_api {
1440            AuthApi::Matrix(matrix_auth) => {
1441                matrix_auth.logout().await?;
1442                Ok(())
1443            }
1444            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1445        }
1446    }
1447
1448    /// Get or upload a sync filter.
1449    ///
1450    /// This method will either get a filter ID from the store or upload the
1451    /// filter definition to the homeserver and return the new filter ID.
1452    ///
1453    /// # Arguments
1454    ///
1455    /// * `filter_name` - The unique name of the filter, this name will be used
1456    /// locally to store and identify the filter ID returned by the server.
1457    ///
1458    /// * `definition` - The filter definition that should be uploaded to the
1459    /// server if no filter ID can be found in the store.
1460    ///
1461    /// # Examples
1462    ///
1463    /// ```no_run
1464    /// # use matrix_sdk::{
1465    /// #    Client, config::SyncSettings,
1466    /// #    ruma::api::client::{
1467    /// #        filter::{
1468    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1469    /// #        },
1470    /// #        sync::sync_events::v3::Filter,
1471    /// #    }
1472    /// # };
1473    /// # use url::Url;
1474    /// # async {
1475    /// # let homeserver = Url::parse("http://example.com").unwrap();
1476    /// # let client = Client::new(homeserver).await.unwrap();
1477    /// let mut filter = FilterDefinition::default();
1478    ///
1479    /// // Let's enable member lazy loading.
1480    /// filter.room.state.lazy_load_options =
1481    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1482    ///
1483    /// let filter_id = client
1484    ///     .get_or_upload_filter("sync", filter)
1485    ///     .await
1486    ///     .unwrap();
1487    ///
1488    /// let sync_settings = SyncSettings::new()
1489    ///     .filter(Filter::FilterId(filter_id));
1490    ///
1491    /// let response = client.sync_once(sync_settings).await.unwrap();
1492    /// # };
1493    #[instrument(skip(self, definition))]
1494    pub async fn get_or_upload_filter(
1495        &self,
1496        filter_name: &str,
1497        definition: FilterDefinition,
1498    ) -> Result<String> {
1499        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1500            debug!("Found filter locally");
1501            Ok(filter)
1502        } else {
1503            debug!("Didn't find filter locally");
1504            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1505            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1506            let response = self.send(request).await?;
1507
1508            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1509
1510            Ok(response.filter_id)
1511        }
1512    }
1513
1514    /// Prepare to join a room by ID, by getting the current details about it
1515    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1516        let room = self.get_room(room_id)?;
1517
1518        let inviter = match room.invite_details().await {
1519            Ok(details) => details.inviter,
1520            Err(Error::WrongRoomState(_)) => None,
1521            Err(e) => {
1522                warn!("Error fetching invite details for room: {e:?}");
1523                None
1524            }
1525        };
1526
1527        Some(PreJoinRoomInfo { inviter })
1528    }
1529
1530    /// Finish joining a room.
1531    ///
1532    /// If the room was an invite that should be marked as a DM, will include it
1533    /// in the DM event after creating the joined room.
1534    ///
1535    /// If encrypted history sharing is enabled, will check to see if we have a
1536    /// key bundle, and import it if so.
1537    ///
1538    /// # Arguments
1539    ///
1540    /// * `room_id` - The `RoomId` of the room that was joined.
1541    /// * `pre_join_room_info` - Information about the room before we joined.
1542    async fn finish_join_room(
1543        &self,
1544        room_id: &RoomId,
1545        pre_join_room_info: Option<PreJoinRoomInfo>,
1546    ) -> Result<Room> {
1547        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1548            room.state() == RoomState::Invited
1549                && room.is_direct().await.unwrap_or_else(|e| {
1550                    warn!(%room_id, "is_direct() failed: {e}");
1551                    false
1552                })
1553        } else {
1554            false
1555        };
1556
1557        let base_room = self
1558            .base_client()
1559            .room_joined(
1560                room_id,
1561                pre_join_room_info
1562                    .as_ref()
1563                    .and_then(|info| info.inviter.as_ref())
1564                    .map(|i| i.user_id().to_owned()),
1565            )
1566            .await?;
1567        let room = Room::new(self.clone(), base_room);
1568
1569        if mark_as_dm {
1570            room.set_is_direct(true).await?;
1571        }
1572
1573        #[cfg(feature = "e2e-encryption")]
1574        if self.inner.enable_share_history_on_invite {
1575            if let Some(inviter) =
1576                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1577            {
1578                crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1579                    .await?;
1580            }
1581        }
1582
1583        // Suppress "unused variable" and "unused field" lints
1584        #[cfg(not(feature = "e2e-encryption"))]
1585        let _ = pre_join_room_info.map(|i| i.inviter);
1586
1587        Ok(room)
1588    }
1589
1590    /// Join a room by `RoomId`.
1591    ///
1592    /// Returns the `Room` in the joined state.
1593    ///
1594    /// # Arguments
1595    ///
1596    /// * `room_id` - The `RoomId` of the room to be joined.
1597    #[instrument(skip(self))]
1598    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1599        // See who invited us to this room, if anyone. Note we have to do this before
1600        // making the `/join` request, otherwise we could race against the sync.
1601        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1602
1603        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1604        let response = self.send(request).await?;
1605        self.finish_join_room(&response.room_id, pre_join_info).await
1606    }
1607
1608    /// Join a room by `RoomOrAliasId`.
1609    ///
1610    /// Returns the `Room` in the joined state.
1611    ///
1612    /// # Arguments
1613    ///
1614    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1615    ///   alias looks like `#name:example.com`.
1616    /// * `server_names` - The server names to be used for resolving the alias,
1617    ///   if needs be.
1618    pub async fn join_room_by_id_or_alias(
1619        &self,
1620        alias: &RoomOrAliasId,
1621        server_names: &[OwnedServerName],
1622    ) -> Result<Room> {
1623        let pre_join_info = {
1624            match alias.try_into() {
1625                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1626                Err(_) => {
1627                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1628                    // responding to an invitation to the room, and therefore don't need to handle
1629                    // things that happen as a result of invites.
1630                    None
1631                }
1632            }
1633        };
1634        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1635            via: server_names.to_owned(),
1636        });
1637        let response = self.send(request).await?;
1638        self.finish_join_room(&response.room_id, pre_join_info).await
1639    }
1640
1641    /// Search the homeserver's directory of public rooms.
1642    ///
1643    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1644    /// a `get_public_rooms::Response`.
1645    ///
1646    /// # Arguments
1647    ///
1648    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1649    ///
1650    /// * `since` - Pagination token from a previous request.
1651    ///
1652    /// * `server` - The name of the server, if `None` the requested server is
1653    ///   used.
1654    ///
1655    /// # Examples
1656    /// ```no_run
1657    /// use matrix_sdk::Client;
1658    /// # use url::Url;
1659    /// # let homeserver = Url::parse("http://example.com").unwrap();
1660    /// # let limit = Some(10);
1661    /// # let since = Some("since token");
1662    /// # let server = Some("servername.com".try_into().unwrap());
1663    /// # async {
1664    /// let mut client = Client::new(homeserver).await.unwrap();
1665    ///
1666    /// client.public_rooms(limit, since, server).await;
1667    /// # };
1668    /// ```
1669    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1670    pub async fn public_rooms(
1671        &self,
1672        limit: Option<u32>,
1673        since: Option<&str>,
1674        server: Option<&ServerName>,
1675    ) -> HttpResult<get_public_rooms::v3::Response> {
1676        let limit = limit.map(UInt::from);
1677
1678        let request = assign!(get_public_rooms::v3::Request::new(), {
1679            limit,
1680            since: since.map(ToOwned::to_owned),
1681            server: server.map(ToOwned::to_owned),
1682        });
1683        self.send(request).await
1684    }
1685
1686    /// Create a room with the given parameters.
1687    ///
1688    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1689    /// created room.
1690    ///
1691    /// If you want to create a direct message with one specific user, you can
1692    /// use [`create_dm`][Self::create_dm], which is more convenient than
1693    /// assembling the [`create_room::v3::Request`] yourself.
1694    ///
1695    /// If the `is_direct` field of the request is set to `true` and at least
1696    /// one user is invited, the room will be automatically added to the direct
1697    /// rooms in the account data.
1698    ///
1699    /// # Examples
1700    ///
1701    /// ```no_run
1702    /// use matrix_sdk::{
1703    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1704    ///     Client,
1705    /// };
1706    /// # use url::Url;
1707    /// #
1708    /// # async {
1709    /// # let homeserver = Url::parse("http://example.com").unwrap();
1710    /// let request = CreateRoomRequest::new();
1711    /// let client = Client::new(homeserver).await.unwrap();
1712    /// assert!(client.create_room(request).await.is_ok());
1713    /// # };
1714    /// ```
1715    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1716        let invite = request.invite.clone();
1717        let is_direct_room = request.is_direct;
1718        let response = self.send(request).await?;
1719
1720        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1721
1722        let joined_room = Room::new(self.clone(), base_room);
1723
1724        if is_direct_room && !invite.is_empty() {
1725            if let Err(error) =
1726                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1727            {
1728                // FIXME: Retry in the background
1729                error!("Failed to mark room as DM: {error}");
1730            }
1731        }
1732
1733        Ok(joined_room)
1734    }
1735
1736    /// Create a DM room.
1737    ///
1738    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1739    /// given user being invited, the room marked `is_direct` and both the
1740    /// creator and invitee getting the default maximum power level.
1741    ///
1742    /// If the `e2e-encryption` feature is enabled, the room will also be
1743    /// encrypted.
1744    ///
1745    /// # Arguments
1746    ///
1747    /// * `user_id` - The ID of the user to create a DM for.
1748    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1749        #[cfg(feature = "e2e-encryption")]
1750        let initial_state =
1751            vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1752                .to_raw_any()];
1753
1754        #[cfg(not(feature = "e2e-encryption"))]
1755        let initial_state = vec![];
1756
1757        let request = assign!(create_room::v3::Request::new(), {
1758            invite: vec![user_id.to_owned()],
1759            is_direct: true,
1760            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1761            initial_state,
1762        });
1763
1764        self.create_room(request).await
1765    }
1766
1767    /// Search the homeserver's directory for public rooms with a filter.
1768    ///
1769    /// # Arguments
1770    ///
1771    /// * `room_search` - The easiest way to create this request is using the
1772    ///   `get_public_rooms_filtered::Request` itself.
1773    ///
1774    /// # Examples
1775    ///
1776    /// ```no_run
1777    /// # use url::Url;
1778    /// # use matrix_sdk::Client;
1779    /// # async {
1780    /// # let homeserver = Url::parse("http://example.com")?;
1781    /// use matrix_sdk::ruma::{
1782    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1783    /// };
1784    /// # let mut client = Client::new(homeserver).await?;
1785    ///
1786    /// let mut filter = Filter::new();
1787    /// filter.generic_search_term = Some("rust".to_owned());
1788    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1789    /// request.filter = filter;
1790    ///
1791    /// let response = client.public_rooms_filtered(request).await?;
1792    ///
1793    /// for room in response.chunk {
1794    ///     println!("Found room {room:?}");
1795    /// }
1796    /// # anyhow::Ok(()) };
1797    /// ```
1798    pub async fn public_rooms_filtered(
1799        &self,
1800        request: get_public_rooms_filtered::v3::Request,
1801    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1802        self.send(request).await
1803    }
1804
1805    /// Send an arbitrary request to the server, without updating client state.
1806    ///
1807    /// **Warning:** Because this method *does not* update the client state, it
1808    /// is important to make sure that you account for this yourself, and
1809    /// use wrapper methods where available.  This method should *only* be
1810    /// used if a wrapper method for the endpoint you'd like to use is not
1811    /// available.
1812    ///
1813    /// # Arguments
1814    ///
1815    /// * `request` - A filled out and valid request for the endpoint to be hit
1816    ///
1817    /// * `timeout` - An optional request timeout setting, this overrides the
1818    ///   default request setting if one was set.
1819    ///
1820    /// # Examples
1821    ///
1822    /// ```no_run
1823    /// # use matrix_sdk::{Client, config::SyncSettings};
1824    /// # use url::Url;
1825    /// # async {
1826    /// # let homeserver = Url::parse("http://localhost:8080")?;
1827    /// # let mut client = Client::new(homeserver).await?;
1828    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1829    ///
1830    /// // First construct the request you want to make
1831    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1832    /// // for all available Endpoints
1833    /// let user_id = user_id!("@example:localhost").to_owned();
1834    /// let request = profile::get_profile::v3::Request::new(user_id);
1835    ///
1836    /// // Start the request using Client::send()
1837    /// let response = client.send(request).await?;
1838    ///
1839    /// // Check the corresponding Response struct to find out what types are
1840    /// // returned
1841    /// # anyhow::Ok(()) };
1842    /// ```
1843    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1844    where
1845        Request: OutgoingRequest + Clone + Debug,
1846        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1847    {
1848        SendRequest {
1849            client: self.clone(),
1850            request,
1851            config: None,
1852            send_progress: Default::default(),
1853        }
1854    }
1855
1856    pub(crate) async fn send_inner<Request>(
1857        &self,
1858        request: Request,
1859        config: Option<RequestConfig>,
1860        send_progress: SharedObservable<TransmissionProgress>,
1861    ) -> HttpResult<Request::IncomingResponse>
1862    where
1863        Request: OutgoingRequest + Debug,
1864        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1865    {
1866        let homeserver = self.homeserver().to_string();
1867        let access_token = self.access_token();
1868
1869        self.inner
1870            .http_client
1871            .send(
1872                request,
1873                config,
1874                homeserver,
1875                access_token.as_deref(),
1876                &self.supported_versions().await?,
1877                send_progress,
1878            )
1879            .await
1880    }
1881
1882    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1883        _ = self
1884            .inner
1885            .auth_ctx
1886            .session_change_sender
1887            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1888    }
1889
1890    /// Fetches server versions from network; no caching.
1891    pub async fn fetch_server_versions(
1892        &self,
1893        request_config: Option<RequestConfig>,
1894    ) -> HttpResult<get_supported_versions::Response> {
1895        let server_versions = self
1896            .inner
1897            .http_client
1898            .send(
1899                get_supported_versions::Request::new(),
1900                request_config,
1901                self.homeserver().to_string(),
1902                None,
1903                &SupportedVersions {
1904                    versions: [MatrixVersion::V1_0].into(),
1905                    features: Default::default(),
1906                },
1907                Default::default(),
1908            )
1909            .await?;
1910
1911        Ok(server_versions)
1912    }
1913
1914    /// Fetches client well_known from network; no caching.
1915    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1916        let server_url_string = self
1917            .server()
1918            .unwrap_or(
1919                // Sometimes people configure their well-known directly on the homeserver so use
1920                // this as a fallback when the server name is unknown.
1921                &self.homeserver(),
1922            )
1923            .to_string();
1924
1925        let well_known = self
1926            .inner
1927            .http_client
1928            .send(
1929                discover_homeserver::Request::new(),
1930                Some(RequestConfig::short_retry()),
1931                server_url_string,
1932                None,
1933                &SupportedVersions {
1934                    versions: [MatrixVersion::V1_0].into(),
1935                    features: Default::default(),
1936                },
1937                Default::default(),
1938            )
1939            .await;
1940
1941        match well_known {
1942            Ok(well_known) => Some(well_known),
1943            Err(http_error) => {
1944                // It is perfectly valid to not have a well-known file.
1945                // Maybe we should check for a specific error code to be sure?
1946                warn!("Failed to fetch client well-known: {http_error}");
1947                None
1948            }
1949        }
1950    }
1951
1952    /// Load server info from storage, or fetch them from network and cache
1953    /// them.
1954    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1955        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1956            Ok(Some(stored)) => {
1957                if let Some(server_info) =
1958                    stored.into_server_info().and_then(|info| info.maybe_decode())
1959                {
1960                    return Ok(server_info);
1961                }
1962            }
1963            Ok(None) => {
1964                // fallthrough: cache is empty
1965            }
1966            Err(err) => {
1967                warn!("error when loading cached server info: {err}");
1968                // fallthrough to network.
1969            }
1970        }
1971
1972        let server_versions = self.fetch_server_versions(None).await?;
1973        let well_known = self.fetch_client_well_known().await;
1974        let server_info = ServerInfo::new(
1975            server_versions.versions.clone(),
1976            server_versions.unstable_features.clone(),
1977            well_known.map(Into::into),
1978        );
1979
1980        // Attempt to cache the result in storage.
1981        {
1982            if let Err(err) = self
1983                .state_store()
1984                .set_kv_data(
1985                    StateStoreDataKey::ServerInfo,
1986                    StateStoreDataValue::ServerInfo(server_info.clone()),
1987                )
1988                .await
1989            {
1990                warn!("error when caching server info: {err}");
1991            }
1992        }
1993
1994        Ok(server_info)
1995    }
1996
1997    async fn get_or_load_and_cache_server_info<
1998        Value,
1999        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2000    >(
2001        &self,
2002        map: MapFunction,
2003    ) -> HttpResult<Value> {
2004        let server_info = &self.inner.caches.server_info;
2005        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2006            return Ok(val);
2007        }
2008
2009        let mut guarded_server_info = server_info.write().await;
2010        if let CachedValue::Cached(val) = map(&guarded_server_info) {
2011            return Ok(val);
2012        }
2013
2014        let server_info = self.load_or_fetch_server_info().await?;
2015
2016        // Fill both unstable features and server versions at once.
2017        let mut supported = server_info.supported_versions();
2018        if supported.versions.is_empty() {
2019            supported.versions = [MatrixVersion::V1_0].into();
2020        }
2021
2022        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2023        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2024
2025        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2026        // fetch an optional property), the function will always return some.
2027        Ok(map(&guarded_server_info).unwrap_cached_value())
2028    }
2029
2030    /// Get the Matrix versions and features supported by the homeserver by
2031    /// fetching them from the server or the cache.
2032    ///
2033    /// This is equivalent to calling both [`Client::server_versions()`] and
2034    /// [`Client::unstable_features()`]. To always fetch the result from the
2035    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2036    /// and then `.as_supported_versions()` on the response.
2037    ///
2038    /// # Examples
2039    ///
2040    /// ```no_run
2041    /// use ruma::api::{FeatureFlag, MatrixVersion};
2042    /// # use matrix_sdk::{Client, config::SyncSettings};
2043    /// # use url::Url;
2044    /// # async {
2045    /// # let homeserver = Url::parse("http://localhost:8080")?;
2046    /// # let mut client = Client::new(homeserver).await?;
2047    ///
2048    /// let supported = client.supported_versions().await?;
2049    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2050    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2051    ///
2052    /// let msc_x_feature = FeatureFlag::from("msc_x");
2053    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2054    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2055    /// # anyhow::Ok(()) };
2056    /// ```
2057    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2058        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2059            .await
2060    }
2061
2062    /// Get the Matrix versions supported by the homeserver by fetching them
2063    /// from the server or the cache.
2064    ///
2065    /// # Examples
2066    ///
2067    /// ```no_run
2068    /// use ruma::api::MatrixVersion;
2069    /// # use matrix_sdk::{Client, config::SyncSettings};
2070    /// # use url::Url;
2071    /// # async {
2072    /// # let homeserver = Url::parse("http://localhost:8080")?;
2073    /// # let mut client = Client::new(homeserver).await?;
2074    ///
2075    /// let server_versions = client.server_versions().await?;
2076    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2077    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2078    /// # anyhow::Ok(()) };
2079    /// ```
2080    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2081        self.get_or_load_and_cache_server_info(|server_info| {
2082            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2083        })
2084        .await
2085    }
2086
2087    /// Get the unstable features supported by the homeserver by fetching them
2088    /// from the server or the cache.
2089    ///
2090    /// # Examples
2091    ///
2092    /// ```no_run
2093    /// use matrix_sdk::ruma::api::FeatureFlag;
2094    /// # use matrix_sdk::{Client, config::SyncSettings};
2095    /// # use url::Url;
2096    /// # async {
2097    /// # let homeserver = Url::parse("http://localhost:8080")?;
2098    /// # let mut client = Client::new(homeserver).await?;
2099    ///
2100    /// let msc_x_feature = FeatureFlag::from("msc_x");
2101    /// let unstable_features = client.unstable_features().await?;
2102    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2103    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2104    /// # anyhow::Ok(()) };
2105    /// ```
2106    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2107        self.get_or_load_and_cache_server_info(|server_info| {
2108            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2109        })
2110        .await
2111    }
2112
2113    /// Get information about the homeserver's advertised RTC foci by fetching
2114    /// the well-known file from the server or the cache.
2115    ///
2116    /// # Examples
2117    /// ```no_run
2118    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2119    /// # use url::Url;
2120    /// # async {
2121    /// # let homeserver = Url::parse("http://localhost:8080")?;
2122    /// # let mut client = Client::new(homeserver).await?;
2123    /// let rtc_foci = client.rtc_foci().await?;
2124    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2125    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2126    ///     _ => None,
2127    /// });
2128    /// if let Some(info) = default_livekit_focus_info {
2129    ///     println!("Default LiveKit service URL: {}", info.service_url);
2130    /// }
2131    /// # anyhow::Ok(()) };
2132    /// ```
2133    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2134        let well_known = self
2135            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2136            .await?;
2137
2138        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2139    }
2140
2141    /// Empty the server version and unstable features cache.
2142    ///
2143    /// Since the SDK caches server info (versions, unstable features,
2144    /// well-known etc), it's possible to have a stale entry in the cache. This
2145    /// functions makes it possible to force reset it.
2146    pub async fn reset_server_info(&self) -> Result<()> {
2147        // Empty the in-memory caches.
2148        let mut guard = self.inner.caches.server_info.write().await;
2149        guard.supported_versions = CachedValue::NotSet;
2150
2151        // Empty the store cache.
2152        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2153    }
2154
2155    /// Check whether MSC 4028 is enabled on the homeserver.
2156    ///
2157    /// # Examples
2158    ///
2159    /// ```no_run
2160    /// # use matrix_sdk::{Client, config::SyncSettings};
2161    /// # use url::Url;
2162    /// # async {
2163    /// # let homeserver = Url::parse("http://localhost:8080")?;
2164    /// # let mut client = Client::new(homeserver).await?;
2165    /// let msc4028_enabled =
2166    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2167    /// # anyhow::Ok(()) };
2168    /// ```
2169    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2170        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2171    }
2172
2173    /// Get information of all our own devices.
2174    ///
2175    /// # Examples
2176    ///
2177    /// ```no_run
2178    /// # use matrix_sdk::{Client, config::SyncSettings};
2179    /// # use url::Url;
2180    /// # async {
2181    /// # let homeserver = Url::parse("http://localhost:8080")?;
2182    /// # let mut client = Client::new(homeserver).await?;
2183    /// let response = client.devices().await?;
2184    ///
2185    /// for device in response.devices {
2186    ///     println!(
2187    ///         "Device: {} {}",
2188    ///         device.device_id,
2189    ///         device.display_name.as_deref().unwrap_or("")
2190    ///     );
2191    /// }
2192    /// # anyhow::Ok(()) };
2193    /// ```
2194    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2195        let request = get_devices::v3::Request::new();
2196
2197        self.send(request).await
2198    }
2199
2200    /// Delete the given devices from the server.
2201    ///
2202    /// # Arguments
2203    ///
2204    /// * `devices` - The list of devices that should be deleted from the
2205    ///   server.
2206    ///
2207    /// * `auth_data` - This request requires user interactive auth, the first
2208    ///   request needs to set this to `None` and will always fail with an
2209    ///   `UiaaResponse`. The response will contain information for the
2210    ///   interactive auth and the same request needs to be made but this time
2211    ///   with some `auth_data` provided.
2212    ///
2213    /// ```no_run
2214    /// # use matrix_sdk::{
2215    /// #    ruma::{api::client::uiaa, device_id},
2216    /// #    Client, Error, config::SyncSettings,
2217    /// # };
2218    /// # use serde_json::json;
2219    /// # use url::Url;
2220    /// # use std::collections::BTreeMap;
2221    /// # async {
2222    /// # let homeserver = Url::parse("http://localhost:8080")?;
2223    /// # let mut client = Client::new(homeserver).await?;
2224    /// let devices = &[device_id!("DEVICEID").to_owned()];
2225    ///
2226    /// if let Err(e) = client.delete_devices(devices, None).await {
2227    ///     if let Some(info) = e.as_uiaa_response() {
2228    ///         let mut password = uiaa::Password::new(
2229    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2230    ///             "wordpass".to_owned(),
2231    ///         );
2232    ///         password.session = info.session.clone();
2233    ///
2234    ///         client
2235    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2236    ///             .await?;
2237    ///     }
2238    /// }
2239    /// # anyhow::Ok(()) };
2240    pub async fn delete_devices(
2241        &self,
2242        devices: &[OwnedDeviceId],
2243        auth_data: Option<uiaa::AuthData>,
2244    ) -> HttpResult<delete_devices::v3::Response> {
2245        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2246        request.auth = auth_data;
2247
2248        self.send(request).await
2249    }
2250
2251    /// Change the display name of a device owned by the current user.
2252    ///
2253    /// Returns a `update_device::Response` which specifies the result
2254    /// of the operation.
2255    ///
2256    /// # Arguments
2257    ///
2258    /// * `device_id` - The ID of the device to change the display name of.
2259    /// * `display_name` - The new display name to set.
2260    pub async fn rename_device(
2261        &self,
2262        device_id: &DeviceId,
2263        display_name: &str,
2264    ) -> HttpResult<update_device::v3::Response> {
2265        let mut request = update_device::v3::Request::new(device_id.to_owned());
2266        request.display_name = Some(display_name.to_owned());
2267
2268        self.send(request).await
2269    }
2270
2271    /// Synchronize the client's state with the latest state on the server.
2272    ///
2273    /// ## Syncing Events
2274    ///
2275    /// Messages or any other type of event need to be periodically fetched from
2276    /// the server, this is achieved by sending a `/sync` request to the server.
2277    ///
2278    /// The first sync is sent out without a [`token`]. The response of the
2279    /// first sync will contain a [`next_batch`] field which should then be
2280    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2281    /// don't receive the same events multiple times.
2282    ///
2283    /// ## Long Polling
2284    ///
2285    /// A sync should in the usual case always be in flight. The
2286    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2287    /// long the server will wait for new events before it will respond.
2288    /// The server will respond immediately if some new events arrive before the
2289    /// timeout has expired. If no changes arrive and the timeout expires an
2290    /// empty sync response will be sent to the client.
2291    ///
2292    /// This method of sending a request that may not receive a response
2293    /// immediately is called long polling.
2294    ///
2295    /// ## Filtering Events
2296    ///
2297    /// The number or type of messages and events that the client should receive
2298    /// from the server can be altered using a [`Filter`].
2299    ///
2300    /// Filters can be non-trivial and, since they will be sent with every sync
2301    /// request, they may take up a bunch of unnecessary bandwidth.
2302    ///
2303    /// Luckily filters can be uploaded to the server and reused using an unique
2304    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2305    /// method.
2306    ///
2307    /// # Arguments
2308    ///
2309    /// * `sync_settings` - Settings for the sync call, this allows us to set
2310    /// various options to configure the sync:
2311    ///     * [`filter`] - To configure which events we receive and which get
2312    ///       [filtered] by the server
2313    ///     * [`timeout`] - To configure our [long polling] setup.
2314    ///     * [`token`] - To tell the server which events we already received
2315    ///       and where we wish to continue syncing.
2316    ///     * [`full_state`] - To tell the server that we wish to receive all
2317    ///       state events, regardless of our configured [`token`].
2318    ///     * [`set_presence`] - To tell the server to set the presence and to
2319    ///       which state.
2320    ///
2321    /// # Examples
2322    ///
2323    /// ```no_run
2324    /// # use url::Url;
2325    /// # async {
2326    /// # let homeserver = Url::parse("http://localhost:8080")?;
2327    /// # let username = "";
2328    /// # let password = "";
2329    /// use matrix_sdk::{
2330    ///     config::SyncSettings,
2331    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2332    /// };
2333    ///
2334    /// let client = Client::new(homeserver).await?;
2335    /// client.matrix_auth().login_username(username, password).send().await?;
2336    ///
2337    /// // Sync once so we receive the client state and old messages.
2338    /// client.sync_once(SyncSettings::default()).await?;
2339    ///
2340    /// // Register our handler so we start responding once we receive a new
2341    /// // event.
2342    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2343    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2344    /// });
2345    ///
2346    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2347    /// // from our `sync_once()` call automatically.
2348    /// client.sync(SyncSettings::default()).await;
2349    /// # anyhow::Ok(()) };
2350    /// ```
2351    ///
2352    /// [`sync`]: #method.sync
2353    /// [`SyncSettings`]: crate::config::SyncSettings
2354    /// [`token`]: crate::config::SyncSettings#method.token
2355    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2356    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2357    /// [`set_presence`]: ruma::presence::PresenceState
2358    /// [`filter`]: crate::config::SyncSettings#method.filter
2359    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2360    /// [`next_batch`]: SyncResponse#structfield.next_batch
2361    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2362    /// [long polling]: #long-polling
2363    /// [filtered]: #filtering-events
2364    #[instrument(skip(self))]
2365    pub async fn sync_once(
2366        &self,
2367        sync_settings: crate::config::SyncSettings,
2368    ) -> Result<SyncResponse> {
2369        // The sync might not return for quite a while due to the timeout.
2370        // We'll see if there's anything crypto related to send out before we
2371        // sync, i.e. if we closed our client after a sync but before the
2372        // crypto requests were sent out.
2373        //
2374        // This will mostly be a no-op.
2375        #[cfg(feature = "e2e-encryption")]
2376        if let Err(e) = self.send_outgoing_requests().await {
2377            error!(error = ?e, "Error while sending outgoing E2EE requests");
2378        }
2379
2380        let token = match sync_settings.token {
2381            SyncToken::Specific(token) => Some(token),
2382            SyncToken::NoToken => None,
2383            SyncToken::ReusePrevious => self.sync_token().await,
2384        };
2385
2386        let request = assign!(sync_events::v3::Request::new(), {
2387            filter: sync_settings.filter.map(|f| *f),
2388            since: token,
2389            full_state: sync_settings.full_state,
2390            set_presence: sync_settings.set_presence,
2391            timeout: sync_settings.timeout,
2392            use_state_after: true,
2393        });
2394        let mut request_config = self.request_config();
2395        if let Some(timeout) = sync_settings.timeout {
2396            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2397            request_config.timeout = Some(base_timeout + timeout);
2398        }
2399
2400        let response = self.send(request).with_request_config(request_config).await?;
2401        let next_batch = response.next_batch.clone();
2402        let response = self.process_sync(response).await?;
2403
2404        #[cfg(feature = "e2e-encryption")]
2405        if let Err(e) = self.send_outgoing_requests().await {
2406            error!(error = ?e, "Error while sending outgoing E2EE requests");
2407        }
2408
2409        self.inner.sync_beat.notify(usize::MAX);
2410
2411        Ok(SyncResponse::new(next_batch, response))
2412    }
2413
2414    /// Repeatedly synchronize the client state with the server.
2415    ///
2416    /// This method will only return on error, if cancellation is needed
2417    /// the method should be wrapped in a cancelable task or the
2418    /// [`Client::sync_with_callback`] method can be used or
2419    /// [`Client::sync_with_result_callback`] if you want to handle error
2420    /// cases in the loop, too.
2421    ///
2422    /// This method will internally call [`Client::sync_once`] in a loop.
2423    ///
2424    /// This method can be used with the [`Client::add_event_handler`]
2425    /// method to react to individual events. If you instead wish to handle
2426    /// events in a bulk manner the [`Client::sync_with_callback`],
2427    /// [`Client::sync_with_result_callback`] and
2428    /// [`Client::sync_stream`] methods can be used instead. Those methods
2429    /// repeatedly return the whole sync response.
2430    ///
2431    /// # Arguments
2432    ///
2433    /// * `sync_settings` - Settings for the sync call. *Note* that those
2434    ///   settings will be only used for the first sync call. See the argument
2435    ///   docs for [`Client::sync_once`] for more info.
2436    ///
2437    /// # Return
2438    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2439    /// up to the user of the API to check the error and decide whether the sync
2440    /// should continue or not.
2441    ///
2442    /// # Examples
2443    ///
2444    /// ```no_run
2445    /// # use url::Url;
2446    /// # async {
2447    /// # let homeserver = Url::parse("http://localhost:8080")?;
2448    /// # let username = "";
2449    /// # let password = "";
2450    /// use matrix_sdk::{
2451    ///     config::SyncSettings,
2452    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2453    /// };
2454    ///
2455    /// let client = Client::new(homeserver).await?;
2456    /// client.matrix_auth().login_username(&username, &password).send().await?;
2457    ///
2458    /// // Register our handler so we start responding once we receive a new
2459    /// // event.
2460    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2461    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2462    /// });
2463    ///
2464    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2465    /// // automatically.
2466    /// client.sync(SyncSettings::default()).await?;
2467    /// # anyhow::Ok(()) };
2468    /// ```
2469    ///
2470    /// [argument docs]: #method.sync_once
2471    /// [`sync_with_callback`]: #method.sync_with_callback
2472    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2473        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2474    }
2475
2476    /// Repeatedly call sync to synchronize the client state with the server.
2477    ///
2478    /// # Arguments
2479    ///
2480    /// * `sync_settings` - Settings for the sync call. *Note* that those
2481    ///   settings will be only used for the first sync call. See the argument
2482    ///   docs for [`Client::sync_once`] for more info.
2483    ///
2484    /// * `callback` - A callback that will be called every time a successful
2485    ///   response has been fetched from the server. The callback must return a
2486    ///   boolean which signalizes if the method should stop syncing. If the
2487    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2488    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2489    ///
2490    /// # Return
2491    /// The sync runs until an error occurs or the
2492    /// callback indicates that the Loop should stop. If the callback asked for
2493    /// a regular stop, the result will be `Ok(())` otherwise the
2494    /// `Err(Error)` is returned.
2495    ///
2496    /// # Examples
2497    ///
2498    /// The following example demonstrates how to sync forever while sending all
2499    /// the interesting events through a mpsc channel to another thread e.g. a
2500    /// UI thread.
2501    ///
2502    /// ```no_run
2503    /// # use std::time::Duration;
2504    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2505    /// # use url::Url;
2506    /// # async {
2507    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2508    /// # let mut client = Client::new(homeserver).await.unwrap();
2509    ///
2510    /// use tokio::sync::mpsc::channel;
2511    ///
2512    /// let (tx, rx) = channel(100);
2513    ///
2514    /// let sync_channel = &tx;
2515    /// let sync_settings = SyncSettings::new()
2516    ///     .timeout(Duration::from_secs(30));
2517    ///
2518    /// client
2519    ///     .sync_with_callback(sync_settings, |response| async move {
2520    ///         let channel = sync_channel;
2521    ///         for (room_id, room) in response.rooms.joined {
2522    ///             for event in room.timeline.events {
2523    ///                 channel.send(event).await.unwrap();
2524    ///             }
2525    ///         }
2526    ///
2527    ///         LoopCtrl::Continue
2528    ///     })
2529    ///     .await;
2530    /// };
2531    /// ```
2532    #[instrument(skip_all)]
2533    pub async fn sync_with_callback<C>(
2534        &self,
2535        sync_settings: crate::config::SyncSettings,
2536        callback: impl Fn(SyncResponse) -> C,
2537    ) -> Result<(), Error>
2538    where
2539        C: Future<Output = LoopCtrl>,
2540    {
2541        self.sync_with_result_callback(sync_settings, |result| async {
2542            Ok(callback(result?).await)
2543        })
2544        .await
2545    }
2546
2547    /// Repeatedly call sync to synchronize the client state with the server.
2548    ///
2549    /// # Arguments
2550    ///
2551    /// * `sync_settings` - Settings for the sync call. *Note* that those
2552    ///   settings will be only used for the first sync call. See the argument
2553    ///   docs for [`Client::sync_once`] for more info.
2554    ///
2555    /// * `callback` - A callback that will be called every time after a
2556    ///   response has been received, failure or not. The callback returns a
2557    ///   `Result<LoopCtrl, Error>`, too. When returning
2558    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2559    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2560    ///   function returns `Ok(())`. In case the callback can't handle the
2561    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2562    ///   which results in the sync ending and the `Err(Error)` being returned.
2563    ///
2564    /// # Return
2565    /// The sync runs until an error occurs that the callback can't handle or
2566    /// the callback indicates that the Loop should stop. If the callback
2567    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2568    /// `Err(Error)` is returned.
2569    ///
2570    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2571    /// this, and are handled first without sending the result to the
2572    /// callback. Only after they have exceeded is the `Result` handed to
2573    /// the callback.
2574    ///
2575    /// # Examples
2576    ///
2577    /// The following example demonstrates how to sync forever while sending all
2578    /// the interesting events through a mpsc channel to another thread e.g. a
2579    /// UI thread.
2580    ///
2581    /// ```no_run
2582    /// # use std::time::Duration;
2583    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2584    /// # use url::Url;
2585    /// # async {
2586    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2587    /// # let mut client = Client::new(homeserver).await.unwrap();
2588    /// #
2589    /// use tokio::sync::mpsc::channel;
2590    ///
2591    /// let (tx, rx) = channel(100);
2592    ///
2593    /// let sync_channel = &tx;
2594    /// let sync_settings = SyncSettings::new()
2595    ///     .timeout(Duration::from_secs(30));
2596    ///
2597    /// client
2598    ///     .sync_with_result_callback(sync_settings, |response| async move {
2599    ///         let channel = sync_channel;
2600    ///         let sync_response = response?;
2601    ///         for (room_id, room) in sync_response.rooms.joined {
2602    ///              for event in room.timeline.events {
2603    ///                  channel.send(event).await.unwrap();
2604    ///               }
2605    ///         }
2606    ///
2607    ///         Ok(LoopCtrl::Continue)
2608    ///     })
2609    ///     .await;
2610    /// };
2611    /// ```
2612    #[instrument(skip(self, callback))]
2613    pub async fn sync_with_result_callback<C>(
2614        &self,
2615        sync_settings: crate::config::SyncSettings,
2616        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2617    ) -> Result<(), Error>
2618    where
2619        C: Future<Output = Result<LoopCtrl, Error>>,
2620    {
2621        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2622
2623        while let Some(result) = sync_stream.next().await {
2624            trace!("Running callback");
2625            if callback(result).await? == LoopCtrl::Break {
2626                trace!("Callback told us to stop");
2627                break;
2628            }
2629            trace!("Done running callback");
2630        }
2631
2632        Ok(())
2633    }
2634
2635    //// Repeatedly synchronize the client state with the server.
2636    ///
2637    /// This method will internally call [`Client::sync_once`] in a loop and is
2638    /// equivalent to the [`Client::sync`] method but the responses are provided
2639    /// as an async stream.
2640    ///
2641    /// # Arguments
2642    ///
2643    /// * `sync_settings` - Settings for the sync call. *Note* that those
2644    ///   settings will be only used for the first sync call. See the argument
2645    ///   docs for [`Client::sync_once`] for more info.
2646    ///
2647    /// # Examples
2648    ///
2649    /// ```no_run
2650    /// # use url::Url;
2651    /// # async {
2652    /// # let homeserver = Url::parse("http://localhost:8080")?;
2653    /// # let username = "";
2654    /// # let password = "";
2655    /// use futures_util::StreamExt;
2656    /// use matrix_sdk::{config::SyncSettings, Client};
2657    ///
2658    /// let client = Client::new(homeserver).await?;
2659    /// client.matrix_auth().login_username(&username, &password).send().await?;
2660    ///
2661    /// let mut sync_stream =
2662    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2663    ///
2664    /// while let Some(Ok(response)) = sync_stream.next().await {
2665    ///     for room in response.rooms.joined.values() {
2666    ///         for e in &room.timeline.events {
2667    ///             if let Ok(event) = e.raw().deserialize() {
2668    ///                 println!("Received event {:?}", event);
2669    ///             }
2670    ///         }
2671    ///     }
2672    /// }
2673    ///
2674    /// # anyhow::Ok(()) };
2675    /// ```
2676    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2677    #[instrument(skip(self))]
2678    pub async fn sync_stream(
2679        &self,
2680        mut sync_settings: crate::config::SyncSettings,
2681    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2682        let mut is_first_sync = true;
2683        let mut timeout = None;
2684        let mut last_sync_time: Option<Instant> = None;
2685
2686        let parent_span = Span::current();
2687
2688        async_stream::stream!({
2689            loop {
2690                trace!("Syncing");
2691
2692                if sync_settings.ignore_timeout_on_first_sync {
2693                    if is_first_sync {
2694                        timeout = sync_settings.timeout.take();
2695                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2696                        sync_settings.timeout = timeout.take();
2697                    }
2698
2699                    is_first_sync = false;
2700                }
2701
2702                yield self
2703                    .sync_loop_helper(&mut sync_settings)
2704                    .instrument(parent_span.clone())
2705                    .await;
2706
2707                Client::delay_sync(&mut last_sync_time).await
2708            }
2709        })
2710    }
2711
2712    /// Get the current, if any, sync token of the client.
2713    /// This will be None if the client didn't sync at least once.
2714    pub(crate) async fn sync_token(&self) -> Option<String> {
2715        self.inner.base_client.sync_token().await
2716    }
2717
2718    /// Gets information about the owner of a given access token.
2719    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2720        let request = whoami::v3::Request::new();
2721        self.send(request).await
2722    }
2723
2724    /// Subscribes a new receiver to client SessionChange broadcasts.
2725    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2726        let broadcast = &self.auth_ctx().session_change_sender;
2727        broadcast.subscribe()
2728    }
2729
2730    /// Sets the save/restore session callbacks.
2731    ///
2732    /// This is another mechanism to get synchronous updates to session tokens,
2733    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2734    pub fn set_session_callbacks(
2735        &self,
2736        reload_session_callback: Box<ReloadSessionCallback>,
2737        save_session_callback: Box<SaveSessionCallback>,
2738    ) -> Result<()> {
2739        self.inner
2740            .auth_ctx
2741            .reload_session_callback
2742            .set(reload_session_callback)
2743            .map_err(|_| Error::MultipleSessionCallbacks)?;
2744
2745        self.inner
2746            .auth_ctx
2747            .save_session_callback
2748            .set(save_session_callback)
2749            .map_err(|_| Error::MultipleSessionCallbacks)?;
2750
2751        Ok(())
2752    }
2753
2754    /// Get the notification settings of the current owner of the client.
2755    pub async fn notification_settings(&self) -> NotificationSettings {
2756        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2757        NotificationSettings::new(self.clone(), ruleset)
2758    }
2759
2760    /// Create a new specialized `Client` that can process notifications.
2761    ///
2762    /// See [`CrossProcessStoreLock::new`] to learn more about
2763    /// `cross_process_store_locks_holder_name`.
2764    ///
2765    /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2766    pub async fn notification_client(
2767        &self,
2768        cross_process_store_locks_holder_name: String,
2769    ) -> Result<Client> {
2770        let client = Client {
2771            inner: ClientInner::new(
2772                self.inner.auth_ctx.clone(),
2773                self.server().cloned(),
2774                self.homeserver(),
2775                self.sliding_sync_version(),
2776                self.inner.http_client.clone(),
2777                self.inner
2778                    .base_client
2779                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2780                    .await?,
2781                self.inner.caches.server_info.read().await.clone(),
2782                self.inner.respect_login_well_known,
2783                self.inner.event_cache.clone(),
2784                self.inner.send_queue_data.clone(),
2785                self.inner.latest_events.clone(),
2786                #[cfg(feature = "e2e-encryption")]
2787                self.inner.e2ee.encryption_settings,
2788                #[cfg(feature = "e2e-encryption")]
2789                self.inner.enable_share_history_on_invite,
2790                cross_process_store_locks_holder_name,
2791                #[cfg(feature = "experimental-search")]
2792                self.inner.search_index.clone(),
2793                self.inner.thread_subscription_catchup.clone(),
2794            )
2795            .await,
2796        };
2797
2798        Ok(client)
2799    }
2800
2801    /// The [`EventCache`] instance for this [`Client`].
2802    pub fn event_cache(&self) -> &EventCache {
2803        // SAFETY: always initialized in the `Client` ctor.
2804        self.inner.event_cache.get().unwrap()
2805    }
2806
2807    /// The [`LatestEvents`] instance for this [`Client`].
2808    pub async fn latest_events(&self) -> &LatestEvents {
2809        self.inner
2810            .latest_events
2811            .get_or_init(|| async {
2812                LatestEvents::new(
2813                    WeakClient::from_client(self),
2814                    self.event_cache().clone(),
2815                    SendQueue::new(self.clone()),
2816                )
2817            })
2818            .await
2819    }
2820
2821    /// Waits until an at least partially synced room is received, and returns
2822    /// it.
2823    ///
2824    /// **Note: this function will loop endlessly until either it finds the room
2825    /// or an externally set timeout happens.**
2826    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2827        loop {
2828            if let Some(room) = self.get_room(room_id) {
2829                if room.is_state_partially_or_fully_synced() {
2830                    debug!("Found just created room!");
2831                    return room;
2832                }
2833                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2834            } else {
2835                debug!("Room wasn't found, waiting for sync beat to try again");
2836            }
2837            self.inner.sync_beat.listen().await;
2838        }
2839    }
2840
2841    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2842    /// join it.
2843    pub async fn knock(
2844        &self,
2845        room_id_or_alias: OwnedRoomOrAliasId,
2846        reason: Option<String>,
2847        server_names: Vec<OwnedServerName>,
2848    ) -> Result<Room> {
2849        let request =
2850            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2851        let response = self.send(request).await?;
2852        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2853        Ok(Room::new(self.clone(), base_room))
2854    }
2855
2856    /// Checks whether the provided `user_id` belongs to an ignored user.
2857    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2858        self.base_client().is_user_ignored(user_id).await
2859    }
2860
2861    /// Gets the `max_upload_size` value from the homeserver, getting either a
2862    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2863    /// missing.
2864    ///
2865    /// Check the spec for more info:
2866    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2867    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2868        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2869        if let Some(data) = max_upload_size_lock.get() {
2870            return Ok(data.to_owned());
2871        }
2872
2873        // Use the authenticated endpoint when the server supports it.
2874        let supported_versions = self.supported_versions().await?;
2875        let use_auth =
2876            authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2877
2878        let upload_size = if use_auth {
2879            self.send(authenticated_media::get_media_config::v1::Request::default())
2880                .await?
2881                .upload_size
2882        } else {
2883            #[allow(deprecated)]
2884            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2885        };
2886
2887        match max_upload_size_lock.set(upload_size) {
2888            Ok(_) => Ok(upload_size),
2889            Err(error) => {
2890                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2891            }
2892        }
2893    }
2894
2895    /// The settings to use for decrypting events.
2896    #[cfg(feature = "e2e-encryption")]
2897    pub fn decryption_settings(&self) -> &DecryptionSettings {
2898        &self.base_client().decryption_settings
2899    }
2900
2901    #[cfg(feature = "experimental-search")]
2902    pub(crate) fn search_index(&self) -> &SearchIndex {
2903        &self.inner.search_index
2904    }
2905
2906    /// Whether the client is configured to take thread subscriptions (MSC4306
2907    /// and MSC4308) into account.
2908    ///
2909    /// This may cause filtering out of thread subscriptions, and loading the
2910    /// thread subscriptions via the sliding sync extension, when the room
2911    /// list service is being used.
2912    pub fn enabled_thread_subscriptions(&self) -> bool {
2913        match self.base_client().threading_support {
2914            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2915            ThreadingSupport::Disabled => false,
2916        }
2917    }
2918
2919    /// Fetch thread subscriptions changes between `from` and up to `to`.
2920    ///
2921    /// The `limit` optional parameter can be used to limit the number of
2922    /// entries in a response. It can also be overridden by the server, if
2923    /// it's deemed too large.
2924    pub async fn fetch_thread_subscriptions(
2925        &self,
2926        from: Option<String>,
2927        to: Option<String>,
2928        limit: Option<UInt>,
2929    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2930        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2931            from,
2932            to,
2933            limit,
2934        });
2935        Ok(self.send(request).await?)
2936    }
2937
2938    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2939        self.inner.thread_subscription_catchup.get().unwrap()
2940    }
2941}
2942
2943#[cfg(any(feature = "testing", test))]
2944impl Client {
2945    /// Test helper to mark users as tracked by the crypto layer.
2946    #[cfg(feature = "e2e-encryption")]
2947    pub async fn update_tracked_users_for_testing(
2948        &self,
2949        user_ids: impl IntoIterator<Item = &UserId>,
2950    ) {
2951        let olm = self.olm_machine().await;
2952        let olm = olm.as_ref().unwrap();
2953        olm.update_tracked_users(user_ids).await.unwrap();
2954    }
2955}
2956
2957/// A weak reference to the inner client, useful when trying to get a handle
2958/// on the owning client.
2959#[derive(Clone, Debug)]
2960pub(crate) struct WeakClient {
2961    client: Weak<ClientInner>,
2962}
2963
2964impl WeakClient {
2965    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2966    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2967        Self { client: Arc::downgrade(client) }
2968    }
2969
2970    /// Construct a [`WeakClient`] from a [`Client`].
2971    pub fn from_client(client: &Client) -> Self {
2972        Self::from_inner(&client.inner)
2973    }
2974
2975    /// Attempts to get a [`Client`] from this [`WeakClient`].
2976    pub fn get(&self) -> Option<Client> {
2977        self.client.upgrade().map(|inner| Client { inner })
2978    }
2979
2980    /// Gets the number of strong (`Arc`) pointers still pointing to this
2981    /// client.
2982    #[allow(dead_code)]
2983    pub fn strong_count(&self) -> usize {
2984        self.client.strong_count()
2985    }
2986}
2987
2988#[derive(Clone)]
2989struct ClientServerInfo {
2990    /// The Matrix versions and unstable features the server supports (known
2991    /// ones only).
2992    supported_versions: CachedValue<SupportedVersions>,
2993
2994    /// The server's well-known file, if any.
2995    well_known: CachedValue<Option<WellKnownResponse>>,
2996}
2997
2998/// A cached value that can either be set or not set, used to avoid confusion
2999/// between a value that is set to `None` (because it doesn't exist) and a value
3000/// that has not been cached yet.
3001#[derive(Clone)]
3002enum CachedValue<Value> {
3003    /// A value has been cached.
3004    Cached(Value),
3005    /// Nothing has been cached yet.
3006    NotSet,
3007}
3008
3009impl<Value> CachedValue<Value> {
3010    /// Unwraps the cached value, returning it if it exists.
3011    ///
3012    /// # Panics
3013    ///
3014    /// If the cached value is not set, this will panic.
3015    fn unwrap_cached_value(self) -> Value {
3016        match self {
3017            CachedValue::Cached(value) => value,
3018            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3019        }
3020    }
3021
3022    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3023    fn as_ref(&self) -> CachedValue<&Value> {
3024        match self {
3025            Self::Cached(value) => CachedValue::Cached(value),
3026            Self::NotSet => CachedValue::NotSet,
3027        }
3028    }
3029
3030    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3031    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3032    /// `NotSet`).
3033    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3034    where
3035        F: FnOnce(Value) -> Other,
3036    {
3037        match self {
3038            Self::Cached(value) => CachedValue::Cached(f(value)),
3039            Self::NotSet => CachedValue::NotSet,
3040        }
3041    }
3042}
3043
3044/// Information about the state of a room before we joined it.
3045#[derive(Debug, Clone, Default)]
3046struct PreJoinRoomInfo {
3047    /// The user who invited us to the room, if any.
3048    pub inviter: Option<RoomMember>,
3049}
3050
3051// The http mocking library is not supported for wasm32
3052#[cfg(all(test, not(target_family = "wasm")))]
3053pub(crate) mod tests {
3054    use std::{sync::Arc, time::Duration};
3055
3056    use assert_matches::assert_matches;
3057    use assert_matches2::assert_let;
3058    use eyeball::SharedObservable;
3059    use futures_util::{pin_mut, FutureExt, StreamExt};
3060    use js_int::{uint, UInt};
3061    use matrix_sdk_base::{
3062        store::{MemoryStore, StoreConfig},
3063        RoomState,
3064    };
3065    use matrix_sdk_test::{
3066        async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
3067        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
3068    };
3069    #[cfg(target_family = "wasm")]
3070    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3071
3072    use ruma::{
3073        api::{
3074            client::{
3075                discovery::discover_homeserver::RtcFocusInfo,
3076                room::create_room::v3::Request as CreateRoomRequest,
3077            },
3078            FeatureFlag, MatrixVersion,
3079        },
3080        assign,
3081        events::{
3082            ignored_user_list::IgnoredUserListEventContent,
3083            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3084        },
3085        owned_room_id, owned_user_id, room_alias_id, room_id, RoomId, ServerName, UserId,
3086    };
3087    use serde_json::json;
3088    use stream_assert::{assert_next_matches, assert_pending};
3089    use tokio::{
3090        spawn,
3091        time::{sleep, timeout},
3092    };
3093    use url::Url;
3094
3095    use super::Client;
3096    use crate::{
3097        client::{futures::SendMediaUploadRequest, WeakClient},
3098        config::{RequestConfig, SyncSettings},
3099        futures::SendRequest,
3100        media::MediaError,
3101        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3102        Error, TransmissionProgress,
3103    };
3104
3105    #[async_test]
3106    async fn test_account_data() {
3107        let server = MatrixMockServer::new().await;
3108        let client = server.client_builder().build().await;
3109
3110        let f = EventFactory::new();
3111        server
3112            .mock_sync()
3113            .ok_and_run(&client, |builder| {
3114                builder.add_global_account_data(
3115                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3116                );
3117            })
3118            .await;
3119
3120        let content = client
3121            .account()
3122            .account_data::<IgnoredUserListEventContent>()
3123            .await
3124            .unwrap()
3125            .unwrap()
3126            .deserialize()
3127            .unwrap();
3128
3129        assert_eq!(content.ignored_users.len(), 1);
3130    }
3131
3132    #[async_test]
3133    async fn test_successful_discovery() {
3134        // Imagine this is `matrix.org`.
3135        let server = MatrixMockServer::new().await;
3136        let server_url = server.uri();
3137
3138        // Imagine this is `matrix-client.matrix.org`.
3139        let homeserver = MatrixMockServer::new().await;
3140        let homeserver_url = homeserver.uri();
3141
3142        // Imagine Alice has the user ID `@alice:matrix.org`.
3143        let domain = server_url.strip_prefix("http://").unwrap();
3144        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3145
3146        // The `.well-known` is on the server (e.g. `matrix.org`).
3147        server
3148            .mock_well_known()
3149            .ok_with_homeserver_url(&homeserver_url)
3150            .mock_once()
3151            .named("well-known")
3152            .mount()
3153            .await;
3154
3155        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3156        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3157
3158        let client = Client::builder()
3159            .insecure_server_name_no_tls(alice.server_name())
3160            .build()
3161            .await
3162            .unwrap();
3163
3164        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3165        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3166        client.server_versions().await.unwrap();
3167    }
3168
3169    #[async_test]
3170    async fn test_discovery_broken_server() {
3171        let server = MatrixMockServer::new().await;
3172        let server_url = server.uri();
3173        let domain = server_url.strip_prefix("http://").unwrap();
3174        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3175
3176        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3177
3178        assert!(
3179            Client::builder()
3180                .insecure_server_name_no_tls(alice.server_name())
3181                .build()
3182                .await
3183                .is_err(),
3184            "Creating a client from a user ID should fail when the .well-known request fails."
3185        );
3186    }
3187
3188    #[async_test]
3189    async fn test_room_creation() {
3190        let server = MatrixMockServer::new().await;
3191        let client = server.client_builder().build().await;
3192
3193        server
3194            .mock_sync()
3195            .ok_and_run(&client, |builder| {
3196                builder.add_joined_room(
3197                    JoinedRoomBuilder::default()
3198                        .add_state_event(StateTestEvent::Member)
3199                        .add_state_event(StateTestEvent::PowerLevels),
3200                );
3201            })
3202            .await;
3203
3204        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3205        assert_eq!(room.state(), RoomState::Joined);
3206    }
3207
3208    #[async_test]
3209    async fn test_retry_limit_http_requests() {
3210        let server = MatrixMockServer::new().await;
3211        let client = server
3212            .client_builder()
3213            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3214            .build()
3215            .await;
3216
3217        assert!(client.request_config().retry_limit.unwrap() == 3);
3218
3219        server.mock_login().error500().expect(3).mount().await;
3220
3221        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3222    }
3223
3224    #[async_test]
3225    async fn test_retry_timeout_http_requests() {
3226        // Keep this timeout small so that the test doesn't take long
3227        let retry_timeout = Duration::from_secs(5);
3228        let server = MatrixMockServer::new().await;
3229        let client = server
3230            .client_builder()
3231            .on_builder(|builder| {
3232                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3233            })
3234            .build()
3235            .await;
3236
3237        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3238
3239        server.mock_login().error500().expect(2..).mount().await;
3240
3241        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3242    }
3243
3244    #[async_test]
3245    async fn test_short_retry_initial_http_requests() {
3246        let server = MatrixMockServer::new().await;
3247        let client = server
3248            .client_builder()
3249            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3250            .build()
3251            .await;
3252
3253        server.mock_login().error500().expect(3..).mount().await;
3254
3255        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3256    }
3257
3258    #[async_test]
3259    async fn test_no_retry_http_requests() {
3260        let server = MatrixMockServer::new().await;
3261        let client = server.client_builder().build().await;
3262
3263        server.mock_devices().error500().mock_once().mount().await;
3264
3265        client.devices().await.unwrap_err();
3266    }
3267
3268    #[async_test]
3269    async fn test_set_homeserver() {
3270        let client = MockClientBuilder::new(None).build().await;
3271        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3272
3273        let homeserver = Url::parse("http://example.com/").unwrap();
3274        client.set_homeserver(homeserver.clone());
3275        assert_eq!(client.homeserver(), homeserver);
3276    }
3277
3278    #[async_test]
3279    async fn test_search_user_request() {
3280        let server = MatrixMockServer::new().await;
3281        let client = server.client_builder().build().await;
3282
3283        server.mock_user_directory().ok().mock_once().mount().await;
3284
3285        let response = client.search_users("test", 50).await.unwrap();
3286        assert_eq!(response.results.len(), 1);
3287        let result = &response.results[0];
3288        assert_eq!(result.user_id.to_string(), "@test:example.me");
3289        assert_eq!(result.display_name.clone().unwrap(), "Test");
3290        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3291        assert!(!response.limited);
3292    }
3293
3294    #[async_test]
3295    async fn test_request_unstable_features() {
3296        let server = MatrixMockServer::new().await;
3297        let client = server.client_builder().no_server_versions().build().await;
3298
3299        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3300
3301        let unstable_features = client.unstable_features().await.unwrap();
3302        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3303        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3304    }
3305
3306    #[async_test]
3307    async fn test_can_homeserver_push_encrypted_event_to_device() {
3308        let server = MatrixMockServer::new().await;
3309        let client = server.client_builder().no_server_versions().build().await;
3310
3311        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3312
3313        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3314        assert!(msc4028_enabled);
3315    }
3316
3317    #[async_test]
3318    async fn test_recently_visited_rooms() {
3319        // Tracking recently visited rooms requires authentication
3320        let client = MockClientBuilder::new(None).unlogged().build().await;
3321        assert_matches!(
3322            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3323            Err(Error::AuthenticationRequired)
3324        );
3325
3326        let client = MockClientBuilder::new(None).build().await;
3327        let account = client.account();
3328
3329        // We should start off with an empty list
3330        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3331
3332        // Tracking a valid room id should add it to the list
3333        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3334        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3335        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3336
3337        // And the existing list shouldn't be changed
3338        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3339        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3340
3341        // Tracking the same room again shouldn't change the list
3342        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3343        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3344        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3345
3346        // Tracking a second room should add it to the front of the list
3347        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3348        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3349        assert_eq!(
3350            account.get_recently_visited_rooms().await.unwrap(),
3351            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3352        );
3353
3354        // Tracking the first room yet again should move it to the front of the list
3355        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3356        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3357        assert_eq!(
3358            account.get_recently_visited_rooms().await.unwrap(),
3359            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3360        );
3361
3362        // Tracking should be capped at 20
3363        for n in 0..20 {
3364            account
3365                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3366                .await
3367                .unwrap();
3368        }
3369
3370        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3371
3372        // And the initial rooms should've been pushed out
3373        let rooms = account.get_recently_visited_rooms().await.unwrap();
3374        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3375        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3376
3377        // And the last tracked room should be the first
3378        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3379    }
3380
3381    #[async_test]
3382    async fn test_client_no_cycle_with_event_cache() {
3383        let client = MockClientBuilder::new(None).build().await;
3384
3385        // Wait for the init tasks to die.
3386        sleep(Duration::from_secs(1)).await;
3387
3388        let weak_client = WeakClient::from_client(&client);
3389        assert_eq!(weak_client.strong_count(), 1);
3390
3391        {
3392            let room_id = room_id!("!room:example.org");
3393
3394            // Have the client know the room.
3395            let response = SyncResponseBuilder::default()
3396                .add_joined_room(JoinedRoomBuilder::new(room_id))
3397                .build_sync_response();
3398            client.inner.base_client.receive_sync_response(response).await.unwrap();
3399
3400            client.event_cache().subscribe().unwrap();
3401
3402            let (_room_event_cache, _drop_handles) =
3403                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3404        }
3405
3406        drop(client);
3407
3408        // Give a bit of time for background tasks to die.
3409        sleep(Duration::from_secs(1)).await;
3410
3411        // The weak client must be the last reference to the client now.
3412        assert_eq!(weak_client.strong_count(), 0);
3413        let client = weak_client.get();
3414        assert!(
3415            client.is_none(),
3416            "too many strong references to the client: {}",
3417            Arc::strong_count(&client.unwrap().inner)
3418        );
3419    }
3420
3421    #[async_test]
3422    async fn test_server_info_caching() {
3423        let server = MatrixMockServer::new().await;
3424        let server_url = server.uri();
3425        let domain = server_url.strip_prefix("http://").unwrap();
3426        let server_name = <&ServerName>::try_from(domain).unwrap();
3427        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3428
3429        let well_known_mock = server
3430            .mock_well_known()
3431            .ok()
3432            .named("well known mock")
3433            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3434            .mount_as_scoped()
3435            .await;
3436
3437        let versions_mock = server
3438            .mock_versions()
3439            .ok_with_unstable_features()
3440            .named("first versions mock")
3441            .expect(1)
3442            .mount_as_scoped()
3443            .await;
3444
3445        let memory_store = Arc::new(MemoryStore::new());
3446        let client = Client::builder()
3447            .insecure_server_name_no_tls(server_name)
3448            .store_config(
3449                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3450                    .state_store(memory_store.clone()),
3451            )
3452            .build()
3453            .await
3454            .unwrap();
3455
3456        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3457
3458        // These subsequent calls hit the in-memory cache.
3459        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3460        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3461
3462        drop(client);
3463
3464        let client = server
3465            .client_builder()
3466            .no_server_versions()
3467            .on_builder(|builder| {
3468                builder.store_config(
3469                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3470                        .state_store(memory_store.clone()),
3471                )
3472            })
3473            .build()
3474            .await;
3475
3476        // These calls to the new client hit the on-disk cache.
3477        assert!(client
3478            .unstable_features()
3479            .await
3480            .unwrap()
3481            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3482        let supported = client.supported_versions().await.unwrap();
3483        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3484        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3485
3486        // Then this call hits the in-memory cache.
3487        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3488
3489        drop(versions_mock);
3490        drop(well_known_mock);
3491
3492        // Now, reset the cache, and observe the endpoints being called again once.
3493        client.reset_server_info().await.unwrap();
3494
3495        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3496
3497        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3498
3499        // Hits network again.
3500        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3501        // Hits in-memory cache again.
3502        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3503        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3504    }
3505
3506    #[async_test]
3507    async fn test_server_info_without_a_well_known() {
3508        let server = MatrixMockServer::new().await;
3509        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3510
3511        let versions_mock = server
3512            .mock_versions()
3513            .ok_with_unstable_features()
3514            .named("first versions mock")
3515            .expect(1)
3516            .mount_as_scoped()
3517            .await;
3518
3519        let memory_store = Arc::new(MemoryStore::new());
3520        let client = server
3521            .client_builder()
3522            .no_server_versions()
3523            .on_builder(|builder| {
3524                builder.store_config(
3525                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3526                        .state_store(memory_store.clone()),
3527                )
3528            })
3529            .build()
3530            .await;
3531
3532        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3533
3534        // These subsequent calls hit the in-memory cache.
3535        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3536        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3537
3538        drop(client);
3539
3540        let client = server
3541            .client_builder()
3542            .no_server_versions()
3543            .on_builder(|builder| {
3544                builder.store_config(
3545                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3546                        .state_store(memory_store.clone()),
3547                )
3548            })
3549            .build()
3550            .await;
3551
3552        // This call to the new client hits the on-disk cache.
3553        assert!(client
3554            .unstable_features()
3555            .await
3556            .unwrap()
3557            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3558
3559        // Then this call hits the in-memory cache.
3560        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3561
3562        drop(versions_mock);
3563
3564        // Now, reset the cache, and observe the endpoints being called again once.
3565        client.reset_server_info().await.unwrap();
3566
3567        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3568
3569        // Hits network again.
3570        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3571        // Hits in-memory cache again.
3572        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3573        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3574    }
3575
3576    #[async_test]
3577    async fn test_no_network_doesnt_cause_infinite_retries() {
3578        // We want infinite retries for transient errors.
3579        let client = MockClientBuilder::new(None)
3580            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3581            .build()
3582            .await;
3583
3584        // We don't define a mock server on purpose here, so that the error is really a
3585        // network error.
3586        client.whoami().await.unwrap_err();
3587    }
3588
3589    #[async_test]
3590    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3591        let server = MatrixMockServer::new().await;
3592        let client = server.client_builder().build().await;
3593
3594        let room_id = room_id!("!room:example.org");
3595
3596        server
3597            .mock_sync()
3598            .ok_and_run(&client, |builder| {
3599                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3600            })
3601            .await;
3602
3603        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3604        assert_eq!(room.room_id(), room_id);
3605    }
3606
3607    #[async_test]
3608    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3609        let server = MatrixMockServer::new().await;
3610        let client = server.client_builder().build().await;
3611
3612        let room_id = room_id!("!room:example.org");
3613
3614        let client = Arc::new(client);
3615
3616        // Perform the /sync request with a delay so it starts after the
3617        // `await_room_remote_echo` call has happened
3618        spawn({
3619            let client = client.clone();
3620            async move {
3621                sleep(Duration::from_millis(100)).await;
3622
3623                server
3624                    .mock_sync()
3625                    .ok_and_run(&client, |builder| {
3626                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3627                    })
3628                    .await;
3629            }
3630        });
3631
3632        let room =
3633            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3634        assert_eq!(room.room_id(), room_id);
3635    }
3636
3637    #[async_test]
3638    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3639        let client = MockClientBuilder::new(None).build().await;
3640
3641        let room_id = room_id!("!room:example.org");
3642        // Room is not present so the client won't be able to find it. The call will
3643        // timeout.
3644        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3645    }
3646
3647    #[async_test]
3648    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3649        let server = MatrixMockServer::new().await;
3650        let client = server.client_builder().build().await;
3651
3652        server.mock_create_room().ok().mount().await;
3653
3654        // Create a room in the internal store
3655        let room = client
3656            .create_room(assign!(CreateRoomRequest::new(), {
3657                invite: vec![],
3658                is_direct: false,
3659            }))
3660            .await
3661            .unwrap();
3662
3663        // Room is locally present, but not synced, the call will timeout
3664        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3665            .await
3666            .unwrap_err();
3667    }
3668
3669    #[async_test]
3670    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3671        let server = MatrixMockServer::new().await;
3672        let client = server.client_builder().build().await;
3673
3674        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3675
3676        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3677        assert_matches!(ret, Ok(true));
3678    }
3679
3680    #[async_test]
3681    async fn test_is_room_alias_available_if_alias_is_resolved() {
3682        let server = MatrixMockServer::new().await;
3683        let client = server.client_builder().build().await;
3684
3685        server
3686            .mock_room_directory_resolve_alias()
3687            .ok("!some_room_id:matrix.org", Vec::new())
3688            .expect(1)
3689            .mount()
3690            .await;
3691
3692        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3693        assert_matches!(ret, Ok(false));
3694    }
3695
3696    #[async_test]
3697    async fn test_is_room_alias_available_if_error_found() {
3698        let server = MatrixMockServer::new().await;
3699        let client = server.client_builder().build().await;
3700
3701        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3702
3703        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3704        assert_matches!(ret, Err(_));
3705    }
3706
3707    #[async_test]
3708    async fn test_create_room_alias() {
3709        let server = MatrixMockServer::new().await;
3710        let client = server.client_builder().build().await;
3711
3712        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3713
3714        let ret = client
3715            .create_room_alias(
3716                room_alias_id!("#some_alias:matrix.org"),
3717                room_id!("!some_room:matrix.org"),
3718            )
3719            .await;
3720        assert_matches!(ret, Ok(()));
3721    }
3722
3723    #[async_test]
3724    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3725        let server = MatrixMockServer::new().await;
3726        let client = server.client_builder().build().await;
3727
3728        let room_id = room_id!("!a-room:matrix.org");
3729
3730        // Make sure the summary endpoint is called once
3731        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3732
3733        // We create a locally cached invited room
3734        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3735
3736        // And we get a preview, the server endpoint was reached
3737        let preview = client
3738            .get_room_preview(room_id.into(), Vec::new())
3739            .await
3740            .expect("Room preview should be retrieved");
3741
3742        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3743    }
3744
3745    #[async_test]
3746    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3747        let server = MatrixMockServer::new().await;
3748        let client = server.client_builder().build().await;
3749
3750        let room_id = room_id!("!a-room:matrix.org");
3751
3752        // Make sure the summary endpoint is called once
3753        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3754
3755        // We create a locally cached left room
3756        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3757
3758        // And we get a preview, the server endpoint was reached
3759        let preview = client
3760            .get_room_preview(room_id.into(), Vec::new())
3761            .await
3762            .expect("Room preview should be retrieved");
3763
3764        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3765    }
3766
3767    #[async_test]
3768    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3769        let server = MatrixMockServer::new().await;
3770        let client = server.client_builder().build().await;
3771
3772        let room_id = room_id!("!a-room:matrix.org");
3773
3774        // Make sure the summary endpoint is called once
3775        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3776
3777        // We create a locally cached knocked room
3778        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3779
3780        // And we get a preview, the server endpoint was reached
3781        let preview = client
3782            .get_room_preview(room_id.into(), Vec::new())
3783            .await
3784            .expect("Room preview should be retrieved");
3785
3786        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3787    }
3788
3789    #[async_test]
3790    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3791        let server = MatrixMockServer::new().await;
3792        let client = server.client_builder().build().await;
3793
3794        let room_id = room_id!("!a-room:matrix.org");
3795
3796        // Make sure the summary endpoint is not called
3797        server.mock_room_summary().ok(room_id).never().mount().await;
3798
3799        // We create a locally cached joined room
3800        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3801
3802        // And we get a preview, no server endpoint was reached
3803        let preview = client
3804            .get_room_preview(room_id.into(), Vec::new())
3805            .await
3806            .expect("Room preview should be retrieved");
3807
3808        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3809    }
3810
3811    #[async_test]
3812    async fn test_media_preview_config() {
3813        let server = MatrixMockServer::new().await;
3814        let client = server.client_builder().build().await;
3815
3816        server
3817            .mock_sync()
3818            .ok_and_run(&client, |builder| {
3819                builder.add_custom_global_account_data(json!({
3820                    "content": {
3821                        "media_previews": "private",
3822                        "invite_avatars": "off"
3823                    },
3824                    "type": "m.media_preview_config"
3825                }));
3826            })
3827            .await;
3828
3829        let (initial_value, stream) =
3830            client.account().observe_media_preview_config().await.unwrap();
3831
3832        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3833        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3834        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3835        pin_mut!(stream);
3836        assert_pending!(stream);
3837
3838        server
3839            .mock_sync()
3840            .ok_and_run(&client, |builder| {
3841                builder.add_custom_global_account_data(json!({
3842                    "content": {
3843                        "media_previews": "off",
3844                        "invite_avatars": "on"
3845                    },
3846                    "type": "m.media_preview_config"
3847                }));
3848            })
3849            .await;
3850
3851        assert_next_matches!(
3852            stream,
3853            MediaPreviewConfigEventContent {
3854                media_previews: Some(MediaPreviews::Off),
3855                invite_avatars: Some(InviteAvatars::On),
3856                ..
3857            }
3858        );
3859        assert_pending!(stream);
3860    }
3861
3862    #[async_test]
3863    async fn test_unstable_media_preview_config() {
3864        let server = MatrixMockServer::new().await;
3865        let client = server.client_builder().build().await;
3866
3867        server
3868            .mock_sync()
3869            .ok_and_run(&client, |builder| {
3870                builder.add_custom_global_account_data(json!({
3871                    "content": {
3872                        "media_previews": "private",
3873                        "invite_avatars": "off"
3874                    },
3875                    "type": "io.element.msc4278.media_preview_config"
3876                }));
3877            })
3878            .await;
3879
3880        let (initial_value, stream) =
3881            client.account().observe_media_preview_config().await.unwrap();
3882
3883        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3884        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3885        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3886        pin_mut!(stream);
3887        assert_pending!(stream);
3888
3889        server
3890            .mock_sync()
3891            .ok_and_run(&client, |builder| {
3892                builder.add_custom_global_account_data(json!({
3893                    "content": {
3894                        "media_previews": "off",
3895                        "invite_avatars": "on"
3896                    },
3897                    "type": "io.element.msc4278.media_preview_config"
3898                }));
3899            })
3900            .await;
3901
3902        assert_next_matches!(
3903            stream,
3904            MediaPreviewConfigEventContent {
3905                media_previews: Some(MediaPreviews::Off),
3906                invite_avatars: Some(InviteAvatars::On),
3907                ..
3908            }
3909        );
3910        assert_pending!(stream);
3911    }
3912
3913    #[async_test]
3914    async fn test_media_preview_config_not_found() {
3915        let server = MatrixMockServer::new().await;
3916        let client = server.client_builder().build().await;
3917
3918        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3919
3920        assert!(initial_value.is_none());
3921    }
3922
3923    #[async_test]
3924    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3925        // The default Matrix version we use is 1.11 or higher, so authenticated media
3926        // is supported.
3927        let server = MatrixMockServer::new().await;
3928        let client = server.client_builder().build().await;
3929
3930        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3931
3932        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3933        client.load_or_fetch_max_upload_size().await.unwrap();
3934
3935        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3936    }
3937
3938    #[async_test]
3939    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3940        // The server must advertise support for the stable feature for authenticated
3941        // media support, so we mock the `GET /versions` response.
3942        let server = MatrixMockServer::new().await;
3943        let client = server.client_builder().no_server_versions().build().await;
3944
3945        server
3946            .mock_versions()
3947            .ok_custom(
3948                &["v1.7", "v1.8", "v1.9", "v1.10"],
3949                &[("org.matrix.msc3916.stable", true)].into(),
3950            )
3951            .named("versions")
3952            .expect(1)
3953            .mount()
3954            .await;
3955
3956        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3957
3958        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3959        client.load_or_fetch_max_upload_size().await.unwrap();
3960
3961        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3962    }
3963
3964    #[async_test]
3965    async fn test_load_or_fetch_max_upload_size_no_auth() {
3966        // The server must not support Matrix 1.11 or higher for unauthenticated
3967        // media requests, so we mock the `GET /versions` response.
3968        let server = MatrixMockServer::new().await;
3969        let client = server.client_builder().no_server_versions().build().await;
3970
3971        server
3972            .mock_versions()
3973            .ok_custom(&["v1.1"], &Default::default())
3974            .named("versions")
3975            .expect(1)
3976            .mount()
3977            .await;
3978
3979        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3980
3981        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3982        client.load_or_fetch_max_upload_size().await.unwrap();
3983
3984        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3985    }
3986
3987    #[async_test]
3988    async fn test_uploading_a_too_large_media_file() {
3989        let server = MatrixMockServer::new().await;
3990        let client = server.client_builder().build().await;
3991
3992        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3993        client.load_or_fetch_max_upload_size().await.unwrap();
3994        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3995
3996        let data = vec![1, 2];
3997        let upload_request =
3998            ruma::api::client::media::create_content::v3::Request::new(data.clone());
3999        let request = SendRequest {
4000            client: client.clone(),
4001            request: upload_request,
4002            config: None,
4003            send_progress: SharedObservable::new(TransmissionProgress::default()),
4004        };
4005        let media_request = SendMediaUploadRequest::new(request);
4006
4007        let error = media_request.await.err();
4008        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4009        assert_eq!(max, uint!(1));
4010        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4011    }
4012
4013    #[async_test]
4014    async fn test_dont_ignore_timeout_on_first_sync() {
4015        let server = MatrixMockServer::new().await;
4016        let client = server.client_builder().build().await;
4017
4018        server
4019            .mock_sync()
4020            .timeout(Some(Duration::from_secs(30)))
4021            .ok(|_| {})
4022            .mock_once()
4023            .named("sync_with_timeout")
4024            .mount()
4025            .await;
4026
4027        // Call the endpoint once to check the timeout.
4028        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4029
4030        timeout(Duration::from_secs(1), async {
4031            stream.next().await.unwrap().unwrap();
4032        })
4033        .await
4034        .unwrap();
4035    }
4036
4037    #[async_test]
4038    async fn test_ignore_timeout_on_first_sync() {
4039        let server = MatrixMockServer::new().await;
4040        let client = server.client_builder().build().await;
4041
4042        server
4043            .mock_sync()
4044            .timeout(None)
4045            .ok(|_| {})
4046            .mock_once()
4047            .named("sync_no_timeout")
4048            .mount()
4049            .await;
4050        server
4051            .mock_sync()
4052            .timeout(Some(Duration::from_secs(30)))
4053            .ok(|_| {})
4054            .mock_once()
4055            .named("sync_with_timeout")
4056            .mount()
4057            .await;
4058
4059        // Call each version of the endpoint once to check the timeouts.
4060        let mut stream = Box::pin(
4061            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4062        );
4063
4064        timeout(Duration::from_secs(1), async {
4065            stream.next().await.unwrap().unwrap();
4066            stream.next().await.unwrap().unwrap();
4067        })
4068        .await
4069        .unwrap();
4070    }
4071}