matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{btree_map, BTreeMap, BTreeSet},
19    fmt::{self, Debug},
20    future::{ready, Future},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
32use matrix_sdk_base::{
33    event_cache::store::EventCacheStoreLock,
34    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
35    sync::{Notification, RoomUpdates},
36    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43    api::{
44        client::{
45            account::whoami,
46            alias::{create_alias, delete_alias, get_alias},
47            device::{delete_devices, get_devices, update_device},
48            directory::{get_public_rooms, get_public_rooms_filtered},
49            discovery::{
50                discover_homeserver::{self, RtcFocusInfo},
51                get_capabilities::{self, Capabilities},
52                get_supported_versions,
53            },
54            error::ErrorKind,
55            filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
56            knock::knock_room,
57            membership::{join_room_by_id, join_room_by_id_or_alias},
58            room::create_room,
59            session::login::v3::DiscoveryInfo,
60            sync::sync_events,
61            uiaa,
62            user_directory::search_users,
63        },
64        error::FromHttpResponseError,
65        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
66    },
67    assign,
68    push::Ruleset,
69    time::Instant,
70    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
71    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
72};
73use serde::de::DeserializeOwned;
74use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
75use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
76use url::Url;
77
78use self::futures::SendRequest;
79use crate::{
80    authentication::{
81        matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
82        SaveSessionCallback,
83    },
84    config::RequestConfig,
85    deduplicating_handler::DeduplicatingHandler,
86    error::HttpResult,
87    event_cache::EventCache,
88    event_handler::{
89        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
90        EventHandlerStore, ObservableEventHandler, SyncEvent,
91    },
92    http_client::HttpClient,
93    latest_events::LatestEvents,
94    media::MediaError,
95    notification_settings::NotificationSettings,
96    room::RoomMember,
97    room_preview::RoomPreview,
98    send_queue::{SendQueue, SendQueueData},
99    sliding_sync::Version as SlidingSyncVersion,
100    sync::{RoomUpdate, SyncResponse},
101    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
102    Room, SessionTokens, TransmissionProgress,
103};
104#[cfg(feature = "e2e-encryption")]
105use crate::{
106    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
107    store_locks::CrossProcessStoreLock,
108};
109
110mod builder;
111pub(crate) mod caches;
112pub(crate) mod futures;
113
114pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
115
116#[cfg(not(target_family = "wasm"))]
117type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
118#[cfg(target_family = "wasm")]
119type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
120
121#[cfg(not(target_family = "wasm"))]
122type NotificationHandlerFn =
123    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
124#[cfg(target_family = "wasm")]
125type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
126
127/// Enum controlling if a loop running callbacks should continue or abort.
128///
129/// This is mainly used in the [`sync_with_callback`] method, the return value
130/// of the provided callback controls if the sync loop should be exited.
131///
132/// [`sync_with_callback`]: #method.sync_with_callback
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum LoopCtrl {
135    /// Continue running the loop.
136    Continue,
137    /// Break out of the loop.
138    Break,
139}
140
141/// Represents changes that can occur to a `Client`s `Session`.
142#[derive(Debug, Clone, PartialEq)]
143pub enum SessionChange {
144    /// The session's token is no longer valid.
145    UnknownToken {
146        /// Whether or not the session was soft logged out
147        soft_logout: bool,
148    },
149    /// The session's tokens have been refreshed.
150    TokensRefreshed,
151}
152
153/// An async/await enabled Matrix client.
154///
155/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
156#[derive(Clone)]
157pub struct Client {
158    pub(crate) inner: Arc<ClientInner>,
159}
160
161#[derive(Default)]
162pub(crate) struct ClientLocks {
163    /// Lock ensuring that only a single room may be marked as a DM at once.
164    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
165    /// explanation.
166    pub(crate) mark_as_dm_lock: Mutex<()>,
167
168    /// Lock ensuring that only a single secret store is getting opened at the
169    /// same time.
170    ///
171    /// This is important so we don't accidentally create multiple different new
172    /// default secret storage keys.
173    #[cfg(feature = "e2e-encryption")]
174    pub(crate) open_secret_store_lock: Mutex<()>,
175
176    /// Lock ensuring that we're only storing a single secret at a time.
177    ///
178    /// Take a look at the [`SecretStore::put_secret`] method for a more
179    /// detailed explanation.
180    ///
181    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
182    #[cfg(feature = "e2e-encryption")]
183    pub(crate) store_secret_lock: Mutex<()>,
184
185    /// Lock ensuring that only one method at a time might modify our backup.
186    #[cfg(feature = "e2e-encryption")]
187    pub(crate) backup_modify_lock: Mutex<()>,
188
189    /// Lock ensuring that we're going to attempt to upload backups for a single
190    /// requester.
191    #[cfg(feature = "e2e-encryption")]
192    pub(crate) backup_upload_lock: Mutex<()>,
193
194    /// Handler making sure we only have one group session sharing request in
195    /// flight per room.
196    #[cfg(feature = "e2e-encryption")]
197    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
198
199    /// Lock making sure we're only doing one key claim request at a time.
200    #[cfg(feature = "e2e-encryption")]
201    pub(crate) key_claim_lock: Mutex<()>,
202
203    /// Handler to ensure that only one members request is running at a time,
204    /// given a room.
205    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
206
207    /// Handler to ensure that only one encryption state request is running at a
208    /// time, given a room.
209    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
210
211    /// Deduplicating handler for sending read receipts. The string is an
212    /// internal implementation detail, see [`Self::send_single_receipt`].
213    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
214
215    #[cfg(feature = "e2e-encryption")]
216    pub(crate) cross_process_crypto_store_lock:
217        OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
218
219    /// Latest "generation" of data known by the crypto store.
220    ///
221    /// This is a counter that only increments, set in the database (and can
222    /// wrap). It's incremented whenever some process acquires a lock for the
223    /// first time. *This assumes the crypto store lock is being held, to
224    /// avoid data races on writing to this value in the store*.
225    ///
226    /// The current process will maintain this value in local memory and in the
227    /// DB over time. Observing a different value than the one read in
228    /// memory, when reading from the store indicates that somebody else has
229    /// written into the database under our feet.
230    ///
231    /// TODO: this should live in the `OlmMachine`, since it's information
232    /// related to the lock. As of today (2023-07-28), we blow up the entire
233    /// olm machine when there's a generation mismatch. So storing the
234    /// generation in the olm machine would make the client think there's
235    /// *always* a mismatch, and that's why we need to store the generation
236    /// outside the `OlmMachine`.
237    #[cfg(feature = "e2e-encryption")]
238    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
239}
240
241pub(crate) struct ClientInner {
242    /// All the data related to authentication and authorization.
243    pub(crate) auth_ctx: Arc<AuthCtx>,
244
245    /// The URL of the server.
246    ///
247    /// Not to be confused with the `Self::homeserver`. `server` is usually
248    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
249    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
250    /// homeserver (at the time of writing — 2024-08-28).
251    ///
252    /// This value is optional depending on how the `Client` has been built.
253    /// If it's been built from a homeserver URL directly, we don't know the
254    /// server. However, if the `Client` has been built from a server URL or
255    /// name, then the homeserver has been discovered, and we know both.
256    server: Option<Url>,
257
258    /// The URL of the homeserver to connect to.
259    ///
260    /// This is the URL for the client-server Matrix API.
261    homeserver: StdRwLock<Url>,
262
263    /// The sliding sync version.
264    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
265
266    /// The underlying HTTP client.
267    pub(crate) http_client: HttpClient,
268
269    /// User session data.
270    pub(super) base_client: BaseClient,
271
272    /// Collection of in-memory caches for the [`Client`].
273    pub(crate) caches: ClientCaches,
274
275    /// Collection of locks individual client methods might want to use, either
276    /// to ensure that only a single call to a method happens at once or to
277    /// deduplicate multiple calls to a method.
278    pub(crate) locks: ClientLocks,
279
280    /// The cross-process store locks holder name.
281    ///
282    /// The SDK provides cross-process store locks (see
283    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
284    /// `holder_name` is the value used for all cross-process store locks
285    /// used by this `Client`.
286    ///
287    /// If multiple `Client`s are running in different processes, this
288    /// value MUST be different for each `Client`.
289    cross_process_store_locks_holder_name: String,
290
291    /// A mapping of the times at which the current user sent typing notices,
292    /// keyed by room.
293    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
294
295    /// Event handlers. See `add_event_handler`.
296    pub(crate) event_handlers: EventHandlerStore,
297
298    /// Notification handlers. See `register_notification_handler`.
299    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
300
301    /// The sender-side of channels used to receive room updates.
302    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
303
304    /// The sender-side of a channel used to observe all the room updates of a
305    /// sync response.
306    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
307
308    /// Whether the client should update its homeserver URL with the discovery
309    /// information present in the login response.
310    respect_login_well_known: bool,
311
312    /// An event that can be listened on to wait for a successful sync. The
313    /// event will only be fired if a sync loop is running. Can be used for
314    /// synchronization, e.g. if we send out a request to create a room, we can
315    /// wait for the sync to get the data to fetch a room object from the state
316    /// store.
317    pub(crate) sync_beat: event_listener::Event,
318
319    /// A central cache for events, inactive first.
320    ///
321    /// It becomes active when [`EventCache::subscribe`] is called.
322    pub(crate) event_cache: OnceCell<EventCache>,
323
324    /// End-to-end encryption related state.
325    #[cfg(feature = "e2e-encryption")]
326    pub(crate) e2ee: EncryptionData,
327
328    /// The verification state of our own device.
329    #[cfg(feature = "e2e-encryption")]
330    pub(crate) verification_state: SharedObservable<VerificationState>,
331
332    /// Whether to enable the experimental support for sending and receiving
333    /// encrypted room history on invite, per [MSC4268].
334    ///
335    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
336    #[cfg(feature = "e2e-encryption")]
337    pub(crate) enable_share_history_on_invite: bool,
338
339    /// Data related to the [`SendQueue`].
340    ///
341    /// [`SendQueue`]: crate::send_queue::SendQueue
342    pub(crate) send_queue_data: Arc<SendQueueData>,
343
344    /// The `max_upload_size` value of the homeserver, it contains the max
345    /// request size you can send.
346    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
347
348    /// The entry point to get the [`LatestEvent`] of rooms and threads.
349    ///
350    /// [`LatestEvent`]: crate::latest_event::LatestEvent
351    latest_events: OnceCell<LatestEvents>,
352}
353
354impl ClientInner {
355    /// Create a new `ClientInner`.
356    ///
357    /// All the fields passed as parameters here are those that must be cloned
358    /// upon instantiation of a sub-client, e.g. a client specialized for
359    /// notifications.
360    #[allow(clippy::too_many_arguments)]
361    async fn new(
362        auth_ctx: Arc<AuthCtx>,
363        server: Option<Url>,
364        homeserver: Url,
365        sliding_sync_version: SlidingSyncVersion,
366        http_client: HttpClient,
367        base_client: BaseClient,
368        server_info: ClientServerInfo,
369        respect_login_well_known: bool,
370        event_cache: OnceCell<EventCache>,
371        send_queue: Arc<SendQueueData>,
372        latest_events: OnceCell<LatestEvents>,
373        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
374        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
375        cross_process_store_locks_holder_name: String,
376    ) -> Arc<Self> {
377        let caches = ClientCaches {
378            server_info: server_info.into(),
379            server_metadata: Mutex::new(TtlCache::new()),
380        };
381
382        let client = Self {
383            server,
384            homeserver: StdRwLock::new(homeserver),
385            auth_ctx,
386            sliding_sync_version: StdRwLock::new(sliding_sync_version),
387            http_client,
388            base_client,
389            caches,
390            locks: Default::default(),
391            cross_process_store_locks_holder_name,
392            typing_notice_times: Default::default(),
393            event_handlers: Default::default(),
394            notification_handlers: Default::default(),
395            room_update_channels: Default::default(),
396            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
397            // ballast for all observers to catch up.
398            room_updates_sender: broadcast::Sender::new(32),
399            respect_login_well_known,
400            sync_beat: event_listener::Event::new(),
401            event_cache,
402            send_queue_data: send_queue,
403            latest_events,
404            #[cfg(feature = "e2e-encryption")]
405            e2ee: EncryptionData::new(encryption_settings),
406            #[cfg(feature = "e2e-encryption")]
407            verification_state: SharedObservable::new(VerificationState::Unknown),
408            #[cfg(feature = "e2e-encryption")]
409            enable_share_history_on_invite,
410            server_max_upload_size: Mutex::new(OnceCell::new()),
411        };
412
413        #[allow(clippy::let_and_return)]
414        let client = Arc::new(client);
415
416        #[cfg(feature = "e2e-encryption")]
417        client.e2ee.initialize_room_key_tasks(&client);
418
419        let _ = client
420            .event_cache
421            .get_or_init(|| async {
422                EventCache::new(
423                    WeakClient::from_inner(&client),
424                    client.base_client.event_cache_store().clone(),
425                )
426            })
427            .await;
428
429        client
430    }
431}
432
433#[cfg(not(tarpaulin_include))]
434impl Debug for Client {
435    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
436        write!(fmt, "Client")
437    }
438}
439
440impl Client {
441    /// Create a new [`Client`] that will use the given homeserver.
442    ///
443    /// # Arguments
444    ///
445    /// * `homeserver_url` - The homeserver that the client should connect to.
446    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
447        Self::builder().homeserver_url(homeserver_url).build().await
448    }
449
450    /// Returns a subscriber that publishes an event every time the ignore user
451    /// list changes.
452    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
453        self.inner.base_client.subscribe_to_ignore_user_list_changes()
454    }
455
456    /// Create a new [`ClientBuilder`].
457    pub fn builder() -> ClientBuilder {
458        ClientBuilder::new()
459    }
460
461    pub(crate) fn base_client(&self) -> &BaseClient {
462        &self.inner.base_client
463    }
464
465    /// The underlying HTTP client.
466    pub fn http_client(&self) -> &reqwest::Client {
467        &self.inner.http_client.inner
468    }
469
470    pub(crate) fn locks(&self) -> &ClientLocks {
471        &self.inner.locks
472    }
473
474    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
475        &self.inner.auth_ctx
476    }
477
478    /// The cross-process store locks holder name.
479    ///
480    /// The SDK provides cross-process store locks (see
481    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
482    /// `holder_name` is the value used for all cross-process store locks
483    /// used by this `Client`.
484    pub fn cross_process_store_locks_holder_name(&self) -> &str {
485        &self.inner.cross_process_store_locks_holder_name
486    }
487
488    /// Change the homeserver URL used by this client.
489    ///
490    /// # Arguments
491    ///
492    /// * `homeserver_url` - The new URL to use.
493    fn set_homeserver(&self, homeserver_url: Url) {
494        *self.inner.homeserver.write().unwrap() = homeserver_url;
495    }
496
497    /// Get the capabilities of the homeserver.
498    ///
499    /// This method should be used to check what features are supported by the
500    /// homeserver.
501    ///
502    /// # Examples
503    ///
504    /// ```no_run
505    /// # use matrix_sdk::Client;
506    /// # use url::Url;
507    /// # async {
508    /// # let homeserver = Url::parse("http://example.com")?;
509    /// let client = Client::new(homeserver).await?;
510    ///
511    /// let capabilities = client.get_capabilities().await?;
512    ///
513    /// if capabilities.change_password.enabled {
514    ///     // Change password
515    /// }
516    /// # anyhow::Ok(()) };
517    /// ```
518    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
519        let res = self.send(get_capabilities::v3::Request::new()).await?;
520        Ok(res.capabilities)
521    }
522
523    /// Get a copy of the default request config.
524    ///
525    /// The default request config is what's used when sending requests if no
526    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
527    /// function with such a parameter.
528    ///
529    /// If the default request config was not customized through
530    /// [`ClientBuilder`] when creating this `Client`, the returned value will
531    /// be equivalent to [`RequestConfig::default()`].
532    pub fn request_config(&self) -> RequestConfig {
533        self.inner.http_client.request_config
534    }
535
536    /// Check whether the client has been activated.
537    ///
538    /// A client is considered active when:
539    ///
540    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
541    ///    is logged in,
542    /// 2. Has loaded cached data from storage,
543    /// 3. If encryption is enabled, it also initialized or restored its
544    ///    `OlmMachine`.
545    pub fn is_active(&self) -> bool {
546        self.inner.base_client.is_active()
547    }
548
549    /// The server used by the client.
550    ///
551    /// See `Self::server` to learn more.
552    pub fn server(&self) -> Option<&Url> {
553        self.inner.server.as_ref()
554    }
555
556    /// The homeserver of the client.
557    pub fn homeserver(&self) -> Url {
558        self.inner.homeserver.read().unwrap().clone()
559    }
560
561    /// Get the sliding sync version.
562    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
563        self.inner.sliding_sync_version.read().unwrap().clone()
564    }
565
566    /// Override the sliding sync version.
567    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
568        let mut lock = self.inner.sliding_sync_version.write().unwrap();
569        *lock = version;
570    }
571
572    /// Get the Matrix user session meta information.
573    ///
574    /// If the client is currently logged in, this will return a
575    /// [`SessionMeta`] object which contains the user ID and device ID.
576    /// Otherwise it returns `None`.
577    pub fn session_meta(&self) -> Option<&SessionMeta> {
578        self.base_client().session_meta()
579    }
580
581    /// Returns a receiver that gets events for each room info update. To watch
582    /// for new events, use `receiver.resubscribe()`. Each event contains the
583    /// room and a boolean whether this event should trigger a room list update.
584    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
585        self.base_client().room_info_notable_update_receiver()
586    }
587
588    /// Performs a search for users.
589    /// The search is performed case-insensitively on user IDs and display names
590    ///
591    /// # Arguments
592    ///
593    /// * `search_term` - The search term for the search
594    /// * `limit` - The maximum number of results to return. Defaults to 10.
595    ///
596    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
597    pub async fn search_users(
598        &self,
599        search_term: &str,
600        limit: u64,
601    ) -> HttpResult<search_users::v3::Response> {
602        let mut request = search_users::v3::Request::new(search_term.to_owned());
603
604        if let Some(limit) = UInt::new(limit) {
605            request.limit = limit;
606        }
607
608        self.send(request).await
609    }
610
611    /// Get the user id of the current owner of the client.
612    pub fn user_id(&self) -> Option<&UserId> {
613        self.session_meta().map(|s| s.user_id.as_ref())
614    }
615
616    /// Get the device ID that identifies the current session.
617    pub fn device_id(&self) -> Option<&DeviceId> {
618        self.session_meta().map(|s| s.device_id.as_ref())
619    }
620
621    /// Get the current access token for this session.
622    ///
623    /// Will be `None` if the client has not been logged in.
624    pub fn access_token(&self) -> Option<String> {
625        self.auth_ctx().access_token()
626    }
627
628    /// Get the current tokens for this session.
629    ///
630    /// To be notified of changes in the session tokens, use
631    /// [`Client::subscribe_to_session_changes()`] or
632    /// [`Client::set_session_callbacks()`].
633    ///
634    /// Returns `None` if the client has not been logged in.
635    pub fn session_tokens(&self) -> Option<SessionTokens> {
636        self.auth_ctx().session_tokens()
637    }
638
639    /// Access the authentication API used to log in this client.
640    ///
641    /// Will be `None` if the client has not been logged in.
642    pub fn auth_api(&self) -> Option<AuthApi> {
643        match self.auth_ctx().auth_data.get()? {
644            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
645            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
646        }
647    }
648
649    /// Get the whole session info of this client.
650    ///
651    /// Will be `None` if the client has not been logged in.
652    ///
653    /// Can be used with [`Client::restore_session`] to restore a previously
654    /// logged-in session.
655    pub fn session(&self) -> Option<AuthSession> {
656        match self.auth_api()? {
657            AuthApi::Matrix(api) => api.session().map(Into::into),
658            AuthApi::OAuth(api) => api.full_session().map(Into::into),
659        }
660    }
661
662    /// Get a reference to the state store.
663    pub fn state_store(&self) -> &DynStateStore {
664        self.base_client().state_store()
665    }
666
667    /// Get a reference to the event cache store.
668    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
669        self.base_client().event_cache_store()
670    }
671
672    /// Access the native Matrix authentication API with this client.
673    pub fn matrix_auth(&self) -> MatrixAuth {
674        MatrixAuth::new(self.clone())
675    }
676
677    /// Get the account of the current owner of the client.
678    pub fn account(&self) -> Account {
679        Account::new(self.clone())
680    }
681
682    /// Get the encryption manager of the client.
683    #[cfg(feature = "e2e-encryption")]
684    pub fn encryption(&self) -> Encryption {
685        Encryption::new(self.clone())
686    }
687
688    /// Get the media manager of the client.
689    pub fn media(&self) -> Media {
690        Media::new(self.clone())
691    }
692
693    /// Get the pusher manager of the client.
694    pub fn pusher(&self) -> Pusher {
695        Pusher::new(self.clone())
696    }
697
698    /// Access the OAuth 2.0 API of the client.
699    pub fn oauth(&self) -> OAuth {
700        OAuth::new(self.clone())
701    }
702
703    /// Register a handler for a specific event type.
704    ///
705    /// The handler is a function or closure with one or more arguments. The
706    /// first argument is the event itself. All additional arguments are
707    /// "context" arguments: They have to implement [`EventHandlerContext`].
708    /// This trait is named that way because most of the types implementing it
709    /// give additional context about an event: The room it was in, its raw form
710    /// and other similar things. As two exceptions to this,
711    /// [`Client`] and [`EventHandlerHandle`] also implement the
712    /// `EventHandlerContext` trait so you don't have to clone your client
713    /// into the event handler manually and a handler can decide to remove
714    /// itself.
715    ///
716    /// Some context arguments are not universally applicable. A context
717    /// argument that isn't available for the given event type will result in
718    /// the event handler being skipped and an error being logged. The following
719    /// context argument types are only available for a subset of event types:
720    ///
721    /// * [`Room`] is only available for room-specific events, i.e. not for
722    ///   events like global account data events or presence events.
723    ///
724    /// You can provide custom context via
725    /// [`add_event_handler_context`](Client::add_event_handler_context) and
726    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
727    /// into the event handler.
728    ///
729    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
730    ///
731    /// # Examples
732    ///
733    /// ```no_run
734    /// use matrix_sdk::{
735    ///     deserialized_responses::EncryptionInfo,
736    ///     event_handler::Ctx,
737    ///     ruma::{
738    ///         events::{
739    ///             macros::EventContent,
740    ///             push_rules::PushRulesEvent,
741    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
742    ///         },
743    ///         push::Action,
744    ///         Int, MilliSecondsSinceUnixEpoch,
745    ///     },
746    ///     Client, Room,
747    /// };
748    /// use serde::{Deserialize, Serialize};
749    ///
750    /// # async fn example(client: Client) {
751    /// client.add_event_handler(
752    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
753    ///         // Common usage: Room event plus room and client.
754    ///     },
755    /// );
756    /// client.add_event_handler(
757    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
758    ///         async move {
759    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
760    ///             // unencrypted events and events that were decrypted by the SDK.
761    ///         }
762    ///     },
763    /// );
764    /// client.add_event_handler(
765    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
766    ///         async move {
767    ///             // A `Vec<Action>` parameter allows you to know which push actions
768    ///             // are applicable for an event. For example, an event with
769    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
770    ///             // in the timeline.
771    ///         }
772    ///     },
773    /// );
774    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
775    ///     // You can omit any or all arguments after the first.
776    /// });
777    ///
778    /// // Registering a temporary event handler:
779    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
780    ///     /* Event handler */
781    /// });
782    /// client.remove_event_handler(handle);
783    ///
784    /// // Registering custom event handler context:
785    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
786    /// struct MyContext {
787    ///     number: usize,
788    /// }
789    /// client.add_event_handler_context(MyContext { number: 5 });
790    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
791    ///     // Use the context
792    /// });
793    ///
794    /// // Custom events work exactly the same way, you just need to declare
795    /// // the content struct and use the EventContent derive macro on it.
796    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
797    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
798    /// struct TokenEventContent {
799    ///     token: String,
800    ///     #[serde(rename = "exp")]
801    ///     expires_at: MilliSecondsSinceUnixEpoch,
802    /// }
803    ///
804    /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
805    ///     todo!("Display the token");
806    /// });
807    ///
808    /// // Event handler closures can also capture local variables.
809    /// // Make sure they are cheap to clone though, because they will be cloned
810    /// // every time the closure is called.
811    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
812    ///
813    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
814    ///     println!("Calling the handler with identifier {data}");
815    /// });
816    /// # }
817    /// ```
818    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
819    where
820        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
821        H: EventHandler<Ev, Ctx>,
822    {
823        self.add_event_handler_impl(handler, None)
824    }
825
826    /// Register a handler for a specific room, and event type.
827    ///
828    /// This method works the same way as
829    /// [`add_event_handler`][Self::add_event_handler], except that the handler
830    /// will only be called for events in the room with the specified ID. See
831    /// that method for more details on event handler functions.
832    ///
833    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
834    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
835    /// your use case.
836    pub fn add_room_event_handler<Ev, Ctx, H>(
837        &self,
838        room_id: &RoomId,
839        handler: H,
840    ) -> EventHandlerHandle
841    where
842        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
843        H: EventHandler<Ev, Ctx>,
844    {
845        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
846    }
847
848    /// Observe a specific event type.
849    ///
850    /// `Ev` represents the kind of event that will be observed. `Ctx`
851    /// represents the context that will come with the event. It relies on the
852    /// same mechanism as [`Client::add_event_handler`]. The main difference is
853    /// that it returns an [`ObservableEventHandler`] and doesn't require a
854    /// user-defined closure. It is possible to subscribe to the
855    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
856    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
857    /// Ctx)`.
858    ///
859    /// Be careful that only the most recent value can be observed. Subscribers
860    /// are notified when a new value is sent, but there is no guarantee
861    /// that they will see all values.
862    ///
863    /// # Example
864    ///
865    /// Let's see a classical usage:
866    ///
867    /// ```
868    /// use futures_util::StreamExt as _;
869    /// use matrix_sdk::{
870    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
871    ///     Client, Room,
872    /// };
873    ///
874    /// # async fn example(client: Client) -> Option<()> {
875    /// let observer =
876    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
877    ///
878    /// let mut subscriber = observer.subscribe();
879    ///
880    /// let (event, (room, push_actions)) = subscriber.next().await?;
881    /// # Some(())
882    /// # }
883    /// ```
884    ///
885    /// Now let's see how to get several contexts that can be useful for you:
886    ///
887    /// ```
888    /// use matrix_sdk::{
889    ///     deserialized_responses::EncryptionInfo,
890    ///     ruma::{
891    ///         events::room::{
892    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
893    ///         },
894    ///         push::Action,
895    ///     },
896    ///     Client, Room,
897    /// };
898    ///
899    /// # async fn example(client: Client) {
900    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
901    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
902    ///
903    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
904    /// // to distinguish between unencrypted events and events that were decrypted
905    /// // by the SDK.
906    /// let _ = client
907    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
908    ///     );
909    ///
910    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
911    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
912    /// // should be highlighted in the timeline.
913    /// let _ =
914    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
915    ///
916    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
917    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
918    /// # }
919    /// ```
920    ///
921    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
922    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
923    where
924        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
925        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
926    {
927        self.observe_room_events_impl(None)
928    }
929
930    /// Observe a specific room, and event type.
931    ///
932    /// This method works the same way as [`Client::observe_events`], except
933    /// that the observability will only be applied for events in the room with
934    /// the specified ID. See that method for more details.
935    ///
936    /// Be careful that only the most recent value can be observed. Subscribers
937    /// are notified when a new value is sent, but there is no guarantee
938    /// that they will see all values.
939    pub fn observe_room_events<Ev, Ctx>(
940        &self,
941        room_id: &RoomId,
942    ) -> ObservableEventHandler<(Ev, Ctx)>
943    where
944        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
945        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
946    {
947        self.observe_room_events_impl(Some(room_id.to_owned()))
948    }
949
950    /// Shared implementation for `Client::observe_events` and
951    /// `Client::observe_room_events`.
952    fn observe_room_events_impl<Ev, Ctx>(
953        &self,
954        room_id: Option<OwnedRoomId>,
955    ) -> ObservableEventHandler<(Ev, Ctx)>
956    where
957        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
958        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
959    {
960        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
961        // new value.
962        let shared_observable = SharedObservable::new(None);
963
964        ObservableEventHandler::new(
965            shared_observable.clone(),
966            self.event_handler_drop_guard(self.add_event_handler_impl(
967                move |event: Ev, context: Ctx| {
968                    shared_observable.set(Some((event, context)));
969
970                    ready(())
971                },
972                room_id,
973            )),
974        )
975    }
976
977    /// Remove the event handler associated with the handle.
978    ///
979    /// Note that you **must not** call `remove_event_handler` from the
980    /// non-async part of an event handler, that is:
981    ///
982    /// ```ignore
983    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
984    ///     // âš  this will cause a deadlock âš 
985    ///     client.remove_event_handler(handle);
986    ///
987    ///     async move {
988    ///         // removing the event handler here is fine
989    ///         client.remove_event_handler(handle);
990    ///     }
991    /// })
992    /// ```
993    ///
994    /// Note also that handlers that remove themselves will still execute with
995    /// events received in the same sync cycle.
996    ///
997    /// # Arguments
998    ///
999    /// `handle` - The [`EventHandlerHandle`] that is returned when
1000    /// registering the event handler with [`Client::add_event_handler`].
1001    ///
1002    /// # Examples
1003    ///
1004    /// ```no_run
1005    /// # use url::Url;
1006    /// # use tokio::sync::mpsc;
1007    /// #
1008    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1009    /// #
1010    /// use matrix_sdk::{
1011    ///     event_handler::EventHandlerHandle,
1012    ///     ruma::events::room::member::SyncRoomMemberEvent, Client,
1013    /// };
1014    /// #
1015    /// # futures_executor::block_on(async {
1016    /// # let client = matrix_sdk::Client::builder()
1017    /// #     .homeserver_url(homeserver)
1018    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1019    /// #     .build()
1020    /// #     .await
1021    /// #     .unwrap();
1022    ///
1023    /// client.add_event_handler(
1024    ///     |ev: SyncRoomMemberEvent,
1025    ///      client: Client,
1026    ///      handle: EventHandlerHandle| async move {
1027    ///         // Common usage: Check arriving Event is the expected one
1028    ///         println!("Expected RoomMemberEvent received!");
1029    ///         client.remove_event_handler(handle);
1030    ///     },
1031    /// );
1032    /// # });
1033    /// ```
1034    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1035        self.inner.event_handlers.remove(handle);
1036    }
1037
1038    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1039    /// the given handle.
1040    ///
1041    /// When the returned value is dropped, the event handler will be removed.
1042    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1043        EventHandlerDropGuard::new(handle, self.clone())
1044    }
1045
1046    /// Add an arbitrary value for use as event handler context.
1047    ///
1048    /// The value can be obtained in an event handler by adding an argument of
1049    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1050    ///
1051    /// If a value of the same type has been added before, it will be
1052    /// overwritten.
1053    ///
1054    /// # Examples
1055    ///
1056    /// ```no_run
1057    /// use matrix_sdk::{
1058    ///     event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1059    ///     Room,
1060    /// };
1061    /// # #[derive(Clone)]
1062    /// # struct SomeType;
1063    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1064    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1065    /// # futures_executor::block_on(async {
1066    /// # let client = matrix_sdk::Client::builder()
1067    /// #     .homeserver_url(homeserver)
1068    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1069    /// #     .build()
1070    /// #     .await
1071    /// #     .unwrap();
1072    ///
1073    /// // Handle used to send messages to the UI part of the app
1074    /// let my_gui_handle: SomeType = obtain_gui_handle();
1075    ///
1076    /// client.add_event_handler_context(my_gui_handle.clone());
1077    /// client.add_event_handler(
1078    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1079    ///         async move {
1080    ///             // gui_handle.send(DisplayMessage { message: ev });
1081    ///         }
1082    ///     },
1083    /// );
1084    /// # });
1085    /// ```
1086    pub fn add_event_handler_context<T>(&self, ctx: T)
1087    where
1088        T: Clone + Send + Sync + 'static,
1089    {
1090        self.inner.event_handlers.add_context(ctx);
1091    }
1092
1093    /// Register a handler for a notification.
1094    ///
1095    /// Similar to [`Client::add_event_handler`], but only allows functions
1096    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1097    /// [`Client`] for now.
1098    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1099    where
1100        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1101        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1102    {
1103        self.inner.notification_handlers.write().await.push(Box::new(
1104            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1105        ));
1106
1107        self
1108    }
1109
1110    /// Subscribe to all updates for the room with the given ID.
1111    ///
1112    /// The returned receiver will receive a new message for each sync response
1113    /// that contains updates for that room.
1114    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1115        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1116            btree_map::Entry::Vacant(entry) => {
1117                let (tx, rx) = broadcast::channel(8);
1118                entry.insert(tx);
1119                rx
1120            }
1121            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1122        }
1123    }
1124
1125    /// Subscribe to all updates to all rooms, whenever any has been received in
1126    /// a sync response.
1127    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1128        self.inner.room_updates_sender.subscribe()
1129    }
1130
1131    pub(crate) async fn notification_handlers(
1132        &self,
1133    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1134        self.inner.notification_handlers.read().await
1135    }
1136
1137    /// Get all the rooms the client knows about.
1138    ///
1139    /// This will return the list of joined, invited, and left rooms.
1140    pub fn rooms(&self) -> Vec<Room> {
1141        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1142    }
1143
1144    /// Get all the rooms the client knows about, filtered by room state.
1145    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1146        self.base_client()
1147            .rooms_filtered(filter)
1148            .into_iter()
1149            .map(|room| Room::new(self.clone(), room))
1150            .collect()
1151    }
1152
1153    /// Get a stream of all the rooms, in addition to the existing rooms.
1154    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1155        let (rooms, stream) = self.base_client().rooms_stream();
1156
1157        let map_room = |room| Room::new(self.clone(), room);
1158
1159        (
1160            rooms.into_iter().map(map_room).collect(),
1161            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1162        )
1163    }
1164
1165    /// Returns the joined rooms this client knows about.
1166    pub fn joined_rooms(&self) -> Vec<Room> {
1167        self.base_client()
1168            .rooms_filtered(RoomStateFilter::JOINED)
1169            .into_iter()
1170            .map(|room| Room::new(self.clone(), room))
1171            .collect()
1172    }
1173
1174    /// Returns the invited rooms this client knows about.
1175    pub fn invited_rooms(&self) -> Vec<Room> {
1176        self.base_client()
1177            .rooms_filtered(RoomStateFilter::INVITED)
1178            .into_iter()
1179            .map(|room| Room::new(self.clone(), room))
1180            .collect()
1181    }
1182
1183    /// Returns the left rooms this client knows about.
1184    pub fn left_rooms(&self) -> Vec<Room> {
1185        self.base_client()
1186            .rooms_filtered(RoomStateFilter::LEFT)
1187            .into_iter()
1188            .map(|room| Room::new(self.clone(), room))
1189            .collect()
1190    }
1191
1192    /// Get a room with the given room id.
1193    ///
1194    /// # Arguments
1195    ///
1196    /// `room_id` - The unique id of the room that should be fetched.
1197    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1198        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1199    }
1200
1201    /// Gets the preview of a room, whether the current user has joined it or
1202    /// not.
1203    pub async fn get_room_preview(
1204        &self,
1205        room_or_alias_id: &RoomOrAliasId,
1206        via: Vec<OwnedServerName>,
1207    ) -> Result<RoomPreview> {
1208        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1209            Ok(room_id) => room_id.to_owned(),
1210            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1211        };
1212
1213        if let Some(room) = self.get_room(&room_id) {
1214            // The cached data can only be trusted if the room state is joined or
1215            // banned: for invite and knock rooms, no updates will be received
1216            // for the rooms after the invite/knock action took place so we may
1217            // have very out to date data for important fields such as
1218            // `join_rule`. For left rooms, the homeserver should return the latest info.
1219            match room.state() {
1220                RoomState::Joined | RoomState::Banned => {
1221                    return Ok(RoomPreview::from_known_room(&room).await);
1222                }
1223                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1224            }
1225        }
1226
1227        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1228    }
1229
1230    /// Resolve a room alias to a room id and a list of servers which know
1231    /// about it.
1232    ///
1233    /// # Arguments
1234    ///
1235    /// `room_alias` - The room alias to be resolved.
1236    pub async fn resolve_room_alias(
1237        &self,
1238        room_alias: &RoomAliasId,
1239    ) -> HttpResult<get_alias::v3::Response> {
1240        let request = get_alias::v3::Request::new(room_alias.to_owned());
1241        self.send(request).await
1242    }
1243
1244    /// Checks if a room alias is not in use yet.
1245    ///
1246    /// Returns:
1247    /// - `Ok(true)` if the room alias is available.
1248    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1249    ///   status code).
1250    /// - An `Err` otherwise.
1251    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1252        match self.resolve_room_alias(alias).await {
1253            // The room alias was resolved, so it's already in use.
1254            Ok(_) => Ok(false),
1255            Err(error) => {
1256                match error.client_api_error_kind() {
1257                    // The room alias wasn't found, so it's available.
1258                    Some(ErrorKind::NotFound) => Ok(true),
1259                    _ => Err(error),
1260                }
1261            }
1262        }
1263    }
1264
1265    /// Adds a new room alias associated with a room to the room directory.
1266    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1267        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1268        self.send(request).await?;
1269        Ok(())
1270    }
1271
1272    /// Removes a room alias from the room directory.
1273    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1274        let request = delete_alias::v3::Request::new(alias.to_owned());
1275        self.send(request).await?;
1276        Ok(())
1277    }
1278
1279    /// Update the homeserver from the login response well-known if needed.
1280    ///
1281    /// # Arguments
1282    ///
1283    /// * `login_well_known` - The `well_known` field from a successful login
1284    ///   response.
1285    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1286        if self.inner.respect_login_well_known {
1287            if let Some(well_known) = login_well_known {
1288                if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1289                    self.set_homeserver(homeserver);
1290                }
1291            }
1292        }
1293    }
1294
1295    /// Similar to [`Client::restore_session_with`], with
1296    /// [`RoomLoadSettings::default()`].
1297    ///
1298    /// # Panics
1299    ///
1300    /// Panics if a session was already restored or logged in.
1301    #[instrument(skip_all)]
1302    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1303        self.restore_session_with(session, RoomLoadSettings::default()).await
1304    }
1305
1306    /// Restore a session previously logged-in using one of the available
1307    /// authentication APIs. The number of rooms to restore is controlled by
1308    /// [`RoomLoadSettings`].
1309    ///
1310    /// See the documentation of the corresponding authentication API's
1311    /// `restore_session` method for more information.
1312    ///
1313    /// # Panics
1314    ///
1315    /// Panics if a session was already restored or logged in.
1316    #[instrument(skip_all)]
1317    pub async fn restore_session_with(
1318        &self,
1319        session: impl Into<AuthSession>,
1320        room_load_settings: RoomLoadSettings,
1321    ) -> Result<()> {
1322        let session = session.into();
1323        match session {
1324            AuthSession::Matrix(session) => {
1325                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1326            }
1327            AuthSession::OAuth(session) => {
1328                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1329            }
1330        }
1331    }
1332
1333    /// Refresh the access token using the authentication API used to log into
1334    /// this session.
1335    ///
1336    /// See the documentation of the authentication API's `refresh_access_token`
1337    /// method for more information.
1338    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1339        let Some(auth_api) = self.auth_api() else {
1340            return Err(RefreshTokenError::RefreshTokenRequired);
1341        };
1342
1343        match auth_api {
1344            AuthApi::Matrix(api) => {
1345                trace!("Token refresh: Using the homeserver.");
1346                Box::pin(api.refresh_access_token()).await?;
1347            }
1348            AuthApi::OAuth(api) => {
1349                trace!("Token refresh: Using OAuth 2.0.");
1350                Box::pin(api.refresh_access_token()).await?;
1351            }
1352        }
1353
1354        Ok(())
1355    }
1356
1357    /// Log out the current session using the proper authentication API.
1358    ///
1359    /// # Errors
1360    ///
1361    /// Returns an error if the session is not authenticated or if an error
1362    /// occurred while making the request to the server.
1363    pub async fn logout(&self) -> Result<(), Error> {
1364        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1365        match auth_api {
1366            AuthApi::Matrix(matrix_auth) => {
1367                matrix_auth.logout().await?;
1368                Ok(())
1369            }
1370            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1371        }
1372    }
1373
1374    /// Get or upload a sync filter.
1375    ///
1376    /// This method will either get a filter ID from the store or upload the
1377    /// filter definition to the homeserver and return the new filter ID.
1378    ///
1379    /// # Arguments
1380    ///
1381    /// * `filter_name` - The unique name of the filter, this name will be used
1382    /// locally to store and identify the filter ID returned by the server.
1383    ///
1384    /// * `definition` - The filter definition that should be uploaded to the
1385    /// server if no filter ID can be found in the store.
1386    ///
1387    /// # Examples
1388    ///
1389    /// ```no_run
1390    /// # use matrix_sdk::{
1391    /// #    Client, config::SyncSettings,
1392    /// #    ruma::api::client::{
1393    /// #        filter::{
1394    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1395    /// #        },
1396    /// #        sync::sync_events::v3::Filter,
1397    /// #    }
1398    /// # };
1399    /// # use url::Url;
1400    /// # async {
1401    /// # let homeserver = Url::parse("http://example.com").unwrap();
1402    /// # let client = Client::new(homeserver).await.unwrap();
1403    /// let mut filter = FilterDefinition::default();
1404    ///
1405    /// // Let's enable member lazy loading.
1406    /// filter.room.state.lazy_load_options =
1407    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1408    ///
1409    /// let filter_id = client
1410    ///     .get_or_upload_filter("sync", filter)
1411    ///     .await
1412    ///     .unwrap();
1413    ///
1414    /// let sync_settings = SyncSettings::new()
1415    ///     .filter(Filter::FilterId(filter_id));
1416    ///
1417    /// let response = client.sync_once(sync_settings).await.unwrap();
1418    /// # };
1419    #[instrument(skip(self, definition))]
1420    pub async fn get_or_upload_filter(
1421        &self,
1422        filter_name: &str,
1423        definition: FilterDefinition,
1424    ) -> Result<String> {
1425        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1426            debug!("Found filter locally");
1427            Ok(filter)
1428        } else {
1429            debug!("Didn't find filter locally");
1430            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1431            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1432            let response = self.send(request).await?;
1433
1434            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1435
1436            Ok(response.filter_id)
1437        }
1438    }
1439
1440    /// Prepare to join a room by ID, by getting the current details about it
1441    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1442        let room = self.get_room(room_id)?;
1443
1444        let inviter = match room.invite_details().await {
1445            Ok(details) => details.inviter,
1446            Err(Error::WrongRoomState(_)) => None,
1447            Err(e) => {
1448                warn!("Error fetching invite details for room: {e:?}");
1449                None
1450            }
1451        };
1452
1453        Some(PreJoinRoomInfo { inviter })
1454    }
1455
1456    /// Finish joining a room.
1457    ///
1458    /// If the room was an invite that should be marked as a DM, will include it
1459    /// in the DM event after creating the joined room.
1460    ///
1461    /// If encrypted history sharing is enabled, will check to see if we have a
1462    /// key bundle, and import it if so.
1463    ///
1464    /// # Arguments
1465    ///
1466    /// * `room_id` - The `RoomId` of the room that was joined.
1467    /// * `pre_join_room_info` - Information about the room before we joined.
1468    async fn finish_join_room(
1469        &self,
1470        room_id: &RoomId,
1471        pre_join_room_info: Option<PreJoinRoomInfo>,
1472    ) -> Result<Room> {
1473        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1474            room.state() == RoomState::Invited
1475                && room.is_direct().await.unwrap_or_else(|e| {
1476                    warn!(%room_id, "is_direct() failed: {e}");
1477                    false
1478                })
1479        } else {
1480            false
1481        };
1482
1483        let base_room = self.base_client().room_joined(room_id).await?;
1484        let room = Room::new(self.clone(), base_room);
1485
1486        if mark_as_dm {
1487            room.set_is_direct(true).await?;
1488        }
1489
1490        #[cfg(feature = "e2e-encryption")]
1491        if self.inner.enable_share_history_on_invite {
1492            if let Some(inviter) =
1493                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1494            {
1495                crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1496                    .await?;
1497            }
1498        }
1499
1500        // Suppress "unused variable" and "unused field" lints
1501        #[cfg(not(feature = "e2e-encryption"))]
1502        let _ = pre_join_room_info.map(|i| i.inviter);
1503
1504        Ok(room)
1505    }
1506
1507    /// Join a room by `RoomId`.
1508    ///
1509    /// Returns the `Room` in the joined state.
1510    ///
1511    /// # Arguments
1512    ///
1513    /// * `room_id` - The `RoomId` of the room to be joined.
1514    #[instrument(skip(self))]
1515    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1516        // See who invited us to this room, if anyone. Note we have to do this before
1517        // making the `/join` request, otherwise we could race against the sync.
1518        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1519
1520        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1521        let response = self.send(request).await?;
1522        self.finish_join_room(&response.room_id, pre_join_info).await
1523    }
1524
1525    /// Join a room by `RoomOrAliasId`.
1526    ///
1527    /// Returns the `Room` in the joined state.
1528    ///
1529    /// # Arguments
1530    ///
1531    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1532    ///   alias looks like `#name:example.com`.
1533    /// * `server_names` - The server names to be used for resolving the alias,
1534    ///   if needs be.
1535    pub async fn join_room_by_id_or_alias(
1536        &self,
1537        alias: &RoomOrAliasId,
1538        server_names: &[OwnedServerName],
1539    ) -> Result<Room> {
1540        let pre_join_info = {
1541            match alias.try_into() {
1542                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1543                Err(_) => {
1544                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1545                    // responding to an invitation to the room, and therefore don't need to handle
1546                    // things that happen as a result of invites.
1547                    None
1548                }
1549            }
1550        };
1551        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1552            via: server_names.to_owned(),
1553        });
1554        let response = self.send(request).await?;
1555        self.finish_join_room(&response.room_id, pre_join_info).await
1556    }
1557
1558    /// Search the homeserver's directory of public rooms.
1559    ///
1560    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1561    /// a `get_public_rooms::Response`.
1562    ///
1563    /// # Arguments
1564    ///
1565    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1566    ///
1567    /// * `since` - Pagination token from a previous request.
1568    ///
1569    /// * `server` - The name of the server, if `None` the requested server is
1570    ///   used.
1571    ///
1572    /// # Examples
1573    /// ```no_run
1574    /// use matrix_sdk::Client;
1575    /// # use url::Url;
1576    /// # let homeserver = Url::parse("http://example.com").unwrap();
1577    /// # let limit = Some(10);
1578    /// # let since = Some("since token");
1579    /// # let server = Some("servername.com".try_into().unwrap());
1580    /// # async {
1581    /// let mut client = Client::new(homeserver).await.unwrap();
1582    ///
1583    /// client.public_rooms(limit, since, server).await;
1584    /// # };
1585    /// ```
1586    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1587    pub async fn public_rooms(
1588        &self,
1589        limit: Option<u32>,
1590        since: Option<&str>,
1591        server: Option<&ServerName>,
1592    ) -> HttpResult<get_public_rooms::v3::Response> {
1593        let limit = limit.map(UInt::from);
1594
1595        let request = assign!(get_public_rooms::v3::Request::new(), {
1596            limit,
1597            since: since.map(ToOwned::to_owned),
1598            server: server.map(ToOwned::to_owned),
1599        });
1600        self.send(request).await
1601    }
1602
1603    /// Create a room with the given parameters.
1604    ///
1605    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1606    /// created room.
1607    ///
1608    /// If you want to create a direct message with one specific user, you can
1609    /// use [`create_dm`][Self::create_dm], which is more convenient than
1610    /// assembling the [`create_room::v3::Request`] yourself.
1611    ///
1612    /// If the `is_direct` field of the request is set to `true` and at least
1613    /// one user is invited, the room will be automatically added to the direct
1614    /// rooms in the account data.
1615    ///
1616    /// # Examples
1617    ///
1618    /// ```no_run
1619    /// use matrix_sdk::{
1620    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1621    ///     Client,
1622    /// };
1623    /// # use url::Url;
1624    /// #
1625    /// # async {
1626    /// # let homeserver = Url::parse("http://example.com").unwrap();
1627    /// let request = CreateRoomRequest::new();
1628    /// let client = Client::new(homeserver).await.unwrap();
1629    /// assert!(client.create_room(request).await.is_ok());
1630    /// # };
1631    /// ```
1632    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1633        let invite = request.invite.clone();
1634        let is_direct_room = request.is_direct;
1635        let response = self.send(request).await?;
1636
1637        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1638
1639        let joined_room = Room::new(self.clone(), base_room);
1640
1641        if is_direct_room && !invite.is_empty() {
1642            if let Err(error) =
1643                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1644            {
1645                // FIXME: Retry in the background
1646                error!("Failed to mark room as DM: {error}");
1647            }
1648        }
1649
1650        Ok(joined_room)
1651    }
1652
1653    /// Create a DM room.
1654    ///
1655    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1656    /// given user being invited, the room marked `is_direct` and both the
1657    /// creator and invitee getting the default maximum power level.
1658    ///
1659    /// If the `e2e-encryption` feature is enabled, the room will also be
1660    /// encrypted.
1661    ///
1662    /// # Arguments
1663    ///
1664    /// * `user_id` - The ID of the user to create a DM for.
1665    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1666        #[cfg(feature = "e2e-encryption")]
1667        let initial_state =
1668            vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1669                .to_raw_any()];
1670
1671        #[cfg(not(feature = "e2e-encryption"))]
1672        let initial_state = vec![];
1673
1674        let request = assign!(create_room::v3::Request::new(), {
1675            invite: vec![user_id.to_owned()],
1676            is_direct: true,
1677            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1678            initial_state,
1679        });
1680
1681        self.create_room(request).await
1682    }
1683
1684    /// Search the homeserver's directory for public rooms with a filter.
1685    ///
1686    /// # Arguments
1687    ///
1688    /// * `room_search` - The easiest way to create this request is using the
1689    ///   `get_public_rooms_filtered::Request` itself.
1690    ///
1691    /// # Examples
1692    ///
1693    /// ```no_run
1694    /// # use url::Url;
1695    /// # use matrix_sdk::Client;
1696    /// # async {
1697    /// # let homeserver = Url::parse("http://example.com")?;
1698    /// use matrix_sdk::ruma::{
1699    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1700    /// };
1701    /// # let mut client = Client::new(homeserver).await?;
1702    ///
1703    /// let mut filter = Filter::new();
1704    /// filter.generic_search_term = Some("rust".to_owned());
1705    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1706    /// request.filter = filter;
1707    ///
1708    /// let response = client.public_rooms_filtered(request).await?;
1709    ///
1710    /// for room in response.chunk {
1711    ///     println!("Found room {room:?}");
1712    /// }
1713    /// # anyhow::Ok(()) };
1714    /// ```
1715    pub async fn public_rooms_filtered(
1716        &self,
1717        request: get_public_rooms_filtered::v3::Request,
1718    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1719        self.send(request).await
1720    }
1721
1722    /// Send an arbitrary request to the server, without updating client state.
1723    ///
1724    /// **Warning:** Because this method *does not* update the client state, it
1725    /// is important to make sure that you account for this yourself, and
1726    /// use wrapper methods where available.  This method should *only* be
1727    /// used if a wrapper method for the endpoint you'd like to use is not
1728    /// available.
1729    ///
1730    /// # Arguments
1731    ///
1732    /// * `request` - A filled out and valid request for the endpoint to be hit
1733    ///
1734    /// * `timeout` - An optional request timeout setting, this overrides the
1735    ///   default request setting if one was set.
1736    ///
1737    /// # Examples
1738    ///
1739    /// ```no_run
1740    /// # use matrix_sdk::{Client, config::SyncSettings};
1741    /// # use url::Url;
1742    /// # async {
1743    /// # let homeserver = Url::parse("http://localhost:8080")?;
1744    /// # let mut client = Client::new(homeserver).await?;
1745    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1746    ///
1747    /// // First construct the request you want to make
1748    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1749    /// // for all available Endpoints
1750    /// let user_id = user_id!("@example:localhost").to_owned();
1751    /// let request = profile::get_profile::v3::Request::new(user_id);
1752    ///
1753    /// // Start the request using Client::send()
1754    /// let response = client.send(request).await?;
1755    ///
1756    /// // Check the corresponding Response struct to find out what types are
1757    /// // returned
1758    /// # anyhow::Ok(()) };
1759    /// ```
1760    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1761    where
1762        Request: OutgoingRequest + Clone + Debug,
1763        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1764    {
1765        SendRequest {
1766            client: self.clone(),
1767            request,
1768            config: None,
1769            send_progress: Default::default(),
1770        }
1771    }
1772
1773    pub(crate) async fn send_inner<Request>(
1774        &self,
1775        request: Request,
1776        config: Option<RequestConfig>,
1777        send_progress: SharedObservable<TransmissionProgress>,
1778    ) -> HttpResult<Request::IncomingResponse>
1779    where
1780        Request: OutgoingRequest + Debug,
1781        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1782    {
1783        let homeserver = self.homeserver().to_string();
1784        let access_token = self.access_token();
1785
1786        self.inner
1787            .http_client
1788            .send(
1789                request,
1790                config,
1791                homeserver,
1792                access_token.as_deref(),
1793                &self.server_versions().await?,
1794                send_progress,
1795            )
1796            .await
1797    }
1798
1799    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1800        _ = self
1801            .inner
1802            .auth_ctx
1803            .session_change_sender
1804            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1805    }
1806
1807    /// Fetches server versions from network; no caching.
1808    pub async fn fetch_server_versions(
1809        &self,
1810        request_config: Option<RequestConfig>,
1811    ) -> HttpResult<get_supported_versions::Response> {
1812        let server_versions = self
1813            .inner
1814            .http_client
1815            .send(
1816                get_supported_versions::Request::new(),
1817                request_config,
1818                self.homeserver().to_string(),
1819                None,
1820                &[MatrixVersion::V1_0],
1821                Default::default(),
1822            )
1823            .await?;
1824
1825        Ok(server_versions)
1826    }
1827
1828    /// Fetches client well_known from network; no caching.
1829    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1830        let server_url_string = self
1831            .server()
1832            .unwrap_or(
1833                // Sometimes people configure their well-known directly on the homeserver so use
1834                // this as a fallback when the server name is unknown.
1835                &self.homeserver(),
1836            )
1837            .to_string();
1838
1839        let well_known = self
1840            .inner
1841            .http_client
1842            .send(
1843                discover_homeserver::Request::new(),
1844                Some(RequestConfig::short_retry()),
1845                server_url_string,
1846                None,
1847                &[MatrixVersion::V1_0],
1848                Default::default(),
1849            )
1850            .await;
1851
1852        match well_known {
1853            Ok(well_known) => Some(well_known),
1854            Err(http_error) => {
1855                // It is perfectly valid to not have a well-known file.
1856                // Maybe we should check for a specific error code to be sure?
1857                warn!("Failed to fetch client well-known: {http_error}");
1858                None
1859            }
1860        }
1861    }
1862
1863    /// Load server info from storage, or fetch them from network and cache
1864    /// them.
1865    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1866        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1867            Ok(Some(stored)) => {
1868                if let Some(server_info) =
1869                    stored.into_server_info().and_then(|info| info.maybe_decode())
1870                {
1871                    return Ok(server_info);
1872                }
1873            }
1874            Ok(None) => {
1875                // fallthrough: cache is empty
1876            }
1877            Err(err) => {
1878                warn!("error when loading cached server info: {err}");
1879                // fallthrough to network.
1880            }
1881        }
1882
1883        let server_versions = self.fetch_server_versions(None).await?;
1884        let well_known = self.fetch_client_well_known().await;
1885        let server_info = ServerInfo::new(
1886            server_versions.versions.clone(),
1887            server_versions.unstable_features.clone(),
1888            well_known.map(Into::into),
1889        );
1890
1891        // Attempt to cache the result in storage.
1892        {
1893            if let Err(err) = self
1894                .state_store()
1895                .set_kv_data(
1896                    StateStoreDataKey::ServerInfo,
1897                    StateStoreDataValue::ServerInfo(server_info.clone()),
1898                )
1899                .await
1900            {
1901                warn!("error when caching server info: {err}");
1902            }
1903        }
1904
1905        Ok(server_info)
1906    }
1907
1908    async fn get_or_load_and_cache_server_info<
1909        Value,
1910        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1911    >(
1912        &self,
1913        map: MapFunction,
1914    ) -> HttpResult<Value> {
1915        let server_info = &self.inner.caches.server_info;
1916        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1917            return Ok(val);
1918        }
1919
1920        let mut guarded_server_info = server_info.write().await;
1921        if let CachedValue::Cached(val) = map(&guarded_server_info) {
1922            return Ok(val);
1923        }
1924
1925        let server_info = self.load_or_fetch_server_info().await?;
1926
1927        // Fill both unstable features and server versions at once.
1928        let mut supported = server_info.supported_versions();
1929        if supported.versions.is_empty() {
1930            supported.versions = [MatrixVersion::V1_0].into();
1931        }
1932
1933        guarded_server_info.supported_versions = CachedValue::Cached(supported);
1934        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
1935
1936        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
1937        // fetch an optional property), the function will always return some.
1938        Ok(map(&guarded_server_info).unwrap_cached_value())
1939    }
1940
1941    /// Get the Matrix versions and features supported by the homeserver by
1942    /// fetching them from the server or the cache.
1943    ///
1944    /// This is equivalent to calling both [`Client::server_versions()`] and
1945    /// [`Client::unstable_features()`]. To always fetch the result from the
1946    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
1947    /// and then `.as_supported_versions()` on the response.
1948    ///
1949    /// # Examples
1950    ///
1951    /// ```no_run
1952    /// use ruma::api::{FeatureFlag, MatrixVersion};
1953    /// # use matrix_sdk::{Client, config::SyncSettings};
1954    /// # use url::Url;
1955    /// # async {
1956    /// # let homeserver = Url::parse("http://localhost:8080")?;
1957    /// # let mut client = Client::new(homeserver).await?;
1958    ///
1959    /// let supported = client.supported_versions().await?;
1960    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
1961    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1962    ///
1963    /// let msc_x_feature = FeatureFlag::from("msc_x");
1964    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
1965    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1966    /// # anyhow::Ok(()) };
1967    /// ```
1968    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
1969        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
1970            .await
1971    }
1972
1973    /// Get the Matrix versions supported by the homeserver by fetching them
1974    /// from the server or the cache.
1975    ///
1976    /// # Examples
1977    ///
1978    /// ```no_run
1979    /// use ruma::api::MatrixVersion;
1980    /// # use matrix_sdk::{Client, config::SyncSettings};
1981    /// # use url::Url;
1982    /// # async {
1983    /// # let homeserver = Url::parse("http://localhost:8080")?;
1984    /// # let mut client = Client::new(homeserver).await?;
1985    ///
1986    /// let server_versions = client.server_versions().await?;
1987    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1988    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1989    /// # anyhow::Ok(()) };
1990    /// ```
1991    pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1992        self.get_or_load_and_cache_server_info(|server_info| {
1993            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
1994        })
1995        .await
1996    }
1997
1998    /// Get the unstable features supported by the homeserver by fetching them
1999    /// from the server or the cache.
2000    ///
2001    /// # Examples
2002    ///
2003    /// ```no_run
2004    /// use matrix_sdk::ruma::api::FeatureFlag;
2005    /// # use matrix_sdk::{Client, config::SyncSettings};
2006    /// # use url::Url;
2007    /// # async {
2008    /// # let homeserver = Url::parse("http://localhost:8080")?;
2009    /// # let mut client = Client::new(homeserver).await?;
2010    ///
2011    /// let msc_x_feature = FeatureFlag::from("msc_x");
2012    /// let unstable_features = client.unstable_features().await?;
2013    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2014    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2015    /// # anyhow::Ok(()) };
2016    /// ```
2017    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2018        self.get_or_load_and_cache_server_info(|server_info| {
2019            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2020        })
2021        .await
2022    }
2023
2024    /// Get information about the homeserver's advertised RTC foci by fetching
2025    /// the well-known file from the server or the cache.
2026    ///
2027    /// # Examples
2028    /// ```no_run
2029    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2030    /// # use url::Url;
2031    /// # async {
2032    /// # let homeserver = Url::parse("http://localhost:8080")?;
2033    /// # let mut client = Client::new(homeserver).await?;
2034    /// let rtc_foci = client.rtc_foci().await?;
2035    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2036    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2037    ///     _ => None,
2038    /// });
2039    /// if let Some(info) = default_livekit_focus_info {
2040    ///     println!("Default LiveKit service URL: {}", info.service_url);
2041    /// }
2042    /// # anyhow::Ok(()) };
2043    /// ```
2044    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2045        let well_known = self
2046            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2047            .await?;
2048
2049        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2050    }
2051
2052    /// Empty the server version and unstable features cache.
2053    ///
2054    /// Since the SDK caches server info (versions, unstable features,
2055    /// well-known etc), it's possible to have a stale entry in the cache. This
2056    /// functions makes it possible to force reset it.
2057    pub async fn reset_server_info(&self) -> Result<()> {
2058        // Empty the in-memory caches.
2059        let mut guard = self.inner.caches.server_info.write().await;
2060        guard.supported_versions = CachedValue::NotSet;
2061
2062        // Empty the store cache.
2063        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2064    }
2065
2066    /// Check whether MSC 4028 is enabled on the homeserver.
2067    ///
2068    /// # Examples
2069    ///
2070    /// ```no_run
2071    /// # use matrix_sdk::{Client, config::SyncSettings};
2072    /// # use url::Url;
2073    /// # async {
2074    /// # let homeserver = Url::parse("http://localhost:8080")?;
2075    /// # let mut client = Client::new(homeserver).await?;
2076    /// let msc4028_enabled =
2077    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2078    /// # anyhow::Ok(()) };
2079    /// ```
2080    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2081        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2082    }
2083
2084    /// Get information of all our own devices.
2085    ///
2086    /// # Examples
2087    ///
2088    /// ```no_run
2089    /// # use matrix_sdk::{Client, config::SyncSettings};
2090    /// # use url::Url;
2091    /// # async {
2092    /// # let homeserver = Url::parse("http://localhost:8080")?;
2093    /// # let mut client = Client::new(homeserver).await?;
2094    /// let response = client.devices().await?;
2095    ///
2096    /// for device in response.devices {
2097    ///     println!(
2098    ///         "Device: {} {}",
2099    ///         device.device_id,
2100    ///         device.display_name.as_deref().unwrap_or("")
2101    ///     );
2102    /// }
2103    /// # anyhow::Ok(()) };
2104    /// ```
2105    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2106        let request = get_devices::v3::Request::new();
2107
2108        self.send(request).await
2109    }
2110
2111    /// Delete the given devices from the server.
2112    ///
2113    /// # Arguments
2114    ///
2115    /// * `devices` - The list of devices that should be deleted from the
2116    ///   server.
2117    ///
2118    /// * `auth_data` - This request requires user interactive auth, the first
2119    ///   request needs to set this to `None` and will always fail with an
2120    ///   `UiaaResponse`. The response will contain information for the
2121    ///   interactive auth and the same request needs to be made but this time
2122    ///   with some `auth_data` provided.
2123    ///
2124    /// ```no_run
2125    /// # use matrix_sdk::{
2126    /// #    ruma::{api::client::uiaa, device_id},
2127    /// #    Client, Error, config::SyncSettings,
2128    /// # };
2129    /// # use serde_json::json;
2130    /// # use url::Url;
2131    /// # use std::collections::BTreeMap;
2132    /// # async {
2133    /// # let homeserver = Url::parse("http://localhost:8080")?;
2134    /// # let mut client = Client::new(homeserver).await?;
2135    /// let devices = &[device_id!("DEVICEID").to_owned()];
2136    ///
2137    /// if let Err(e) = client.delete_devices(devices, None).await {
2138    ///     if let Some(info) = e.as_uiaa_response() {
2139    ///         let mut password = uiaa::Password::new(
2140    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2141    ///             "wordpass".to_owned(),
2142    ///         );
2143    ///         password.session = info.session.clone();
2144    ///
2145    ///         client
2146    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2147    ///             .await?;
2148    ///     }
2149    /// }
2150    /// # anyhow::Ok(()) };
2151    pub async fn delete_devices(
2152        &self,
2153        devices: &[OwnedDeviceId],
2154        auth_data: Option<uiaa::AuthData>,
2155    ) -> HttpResult<delete_devices::v3::Response> {
2156        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2157        request.auth = auth_data;
2158
2159        self.send(request).await
2160    }
2161
2162    /// Change the display name of a device owned by the current user.
2163    ///
2164    /// Returns a `update_device::Response` which specifies the result
2165    /// of the operation.
2166    ///
2167    /// # Arguments
2168    ///
2169    /// * `device_id` - The ID of the device to change the display name of.
2170    /// * `display_name` - The new display name to set.
2171    pub async fn rename_device(
2172        &self,
2173        device_id: &DeviceId,
2174        display_name: &str,
2175    ) -> HttpResult<update_device::v3::Response> {
2176        let mut request = update_device::v3::Request::new(device_id.to_owned());
2177        request.display_name = Some(display_name.to_owned());
2178
2179        self.send(request).await
2180    }
2181
2182    /// Synchronize the client's state with the latest state on the server.
2183    ///
2184    /// ## Syncing Events
2185    ///
2186    /// Messages or any other type of event need to be periodically fetched from
2187    /// the server, this is achieved by sending a `/sync` request to the server.
2188    ///
2189    /// The first sync is sent out without a [`token`]. The response of the
2190    /// first sync will contain a [`next_batch`] field which should then be
2191    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2192    /// don't receive the same events multiple times.
2193    ///
2194    /// ## Long Polling
2195    ///
2196    /// A sync should in the usual case always be in flight. The
2197    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2198    /// long the server will wait for new events before it will respond.
2199    /// The server will respond immediately if some new events arrive before the
2200    /// timeout has expired. If no changes arrive and the timeout expires an
2201    /// empty sync response will be sent to the client.
2202    ///
2203    /// This method of sending a request that may not receive a response
2204    /// immediately is called long polling.
2205    ///
2206    /// ## Filtering Events
2207    ///
2208    /// The number or type of messages and events that the client should receive
2209    /// from the server can be altered using a [`Filter`].
2210    ///
2211    /// Filters can be non-trivial and, since they will be sent with every sync
2212    /// request, they may take up a bunch of unnecessary bandwidth.
2213    ///
2214    /// Luckily filters can be uploaded to the server and reused using an unique
2215    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2216    /// method.
2217    ///
2218    /// # Arguments
2219    ///
2220    /// * `sync_settings` - Settings for the sync call, this allows us to set
2221    /// various options to configure the sync:
2222    ///     * [`filter`] - To configure which events we receive and which get
2223    ///       [filtered] by the server
2224    ///     * [`timeout`] - To configure our [long polling] setup.
2225    ///     * [`token`] - To tell the server which events we already received
2226    ///       and where we wish to continue syncing.
2227    ///     * [`full_state`] - To tell the server that we wish to receive all
2228    ///       state events, regardless of our configured [`token`].
2229    ///     * [`set_presence`] - To tell the server to set the presence and to
2230    ///       which state.
2231    ///
2232    /// # Examples
2233    ///
2234    /// ```no_run
2235    /// # use url::Url;
2236    /// # async {
2237    /// # let homeserver = Url::parse("http://localhost:8080")?;
2238    /// # let username = "";
2239    /// # let password = "";
2240    /// use matrix_sdk::{
2241    ///     config::SyncSettings,
2242    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2243    /// };
2244    ///
2245    /// let client = Client::new(homeserver).await?;
2246    /// client.matrix_auth().login_username(username, password).send().await?;
2247    ///
2248    /// // Sync once so we receive the client state and old messages.
2249    /// client.sync_once(SyncSettings::default()).await?;
2250    ///
2251    /// // Register our handler so we start responding once we receive a new
2252    /// // event.
2253    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2254    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2255    /// });
2256    ///
2257    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2258    /// // from our `sync_once()` call automatically.
2259    /// client.sync(SyncSettings::default()).await;
2260    /// # anyhow::Ok(()) };
2261    /// ```
2262    ///
2263    /// [`sync`]: #method.sync
2264    /// [`SyncSettings`]: crate::config::SyncSettings
2265    /// [`token`]: crate::config::SyncSettings#method.token
2266    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2267    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2268    /// [`set_presence`]: ruma::presence::PresenceState
2269    /// [`filter`]: crate::config::SyncSettings#method.filter
2270    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2271    /// [`next_batch`]: SyncResponse#structfield.next_batch
2272    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2273    /// [long polling]: #long-polling
2274    /// [filtered]: #filtering-events
2275    #[instrument(skip(self))]
2276    pub async fn sync_once(
2277        &self,
2278        sync_settings: crate::config::SyncSettings,
2279    ) -> Result<SyncResponse> {
2280        // The sync might not return for quite a while due to the timeout.
2281        // We'll see if there's anything crypto related to send out before we
2282        // sync, i.e. if we closed our client after a sync but before the
2283        // crypto requests were sent out.
2284        //
2285        // This will mostly be a no-op.
2286        #[cfg(feature = "e2e-encryption")]
2287        if let Err(e) = self.send_outgoing_requests().await {
2288            error!(error = ?e, "Error while sending outgoing E2EE requests");
2289        }
2290
2291        let request = assign!(sync_events::v3::Request::new(), {
2292            filter: sync_settings.filter.map(|f| *f),
2293            since: sync_settings.token,
2294            full_state: sync_settings.full_state,
2295            set_presence: sync_settings.set_presence,
2296            timeout: sync_settings.timeout,
2297        });
2298        let mut request_config = self.request_config();
2299        if let Some(timeout) = sync_settings.timeout {
2300            request_config.timeout += timeout;
2301        }
2302
2303        let response = self.send(request).with_request_config(request_config).await?;
2304        let next_batch = response.next_batch.clone();
2305        let response = self.process_sync(response).await?;
2306
2307        #[cfg(feature = "e2e-encryption")]
2308        if let Err(e) = self.send_outgoing_requests().await {
2309            error!(error = ?e, "Error while sending outgoing E2EE requests");
2310        }
2311
2312        self.inner.sync_beat.notify(usize::MAX);
2313
2314        Ok(SyncResponse::new(next_batch, response))
2315    }
2316
2317    /// Repeatedly synchronize the client state with the server.
2318    ///
2319    /// This method will only return on error, if cancellation is needed
2320    /// the method should be wrapped in a cancelable task or the
2321    /// [`Client::sync_with_callback`] method can be used or
2322    /// [`Client::sync_with_result_callback`] if you want to handle error
2323    /// cases in the loop, too.
2324    ///
2325    /// This method will internally call [`Client::sync_once`] in a loop.
2326    ///
2327    /// This method can be used with the [`Client::add_event_handler`]
2328    /// method to react to individual events. If you instead wish to handle
2329    /// events in a bulk manner the [`Client::sync_with_callback`],
2330    /// [`Client::sync_with_result_callback`] and
2331    /// [`Client::sync_stream`] methods can be used instead. Those methods
2332    /// repeatedly return the whole sync response.
2333    ///
2334    /// # Arguments
2335    ///
2336    /// * `sync_settings` - Settings for the sync call. *Note* that those
2337    ///   settings will be only used for the first sync call. See the argument
2338    ///   docs for [`Client::sync_once`] for more info.
2339    ///
2340    /// # Return
2341    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2342    /// up to the user of the API to check the error and decide whether the sync
2343    /// should continue or not.
2344    ///
2345    /// # Examples
2346    ///
2347    /// ```no_run
2348    /// # use url::Url;
2349    /// # async {
2350    /// # let homeserver = Url::parse("http://localhost:8080")?;
2351    /// # let username = "";
2352    /// # let password = "";
2353    /// use matrix_sdk::{
2354    ///     config::SyncSettings,
2355    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2356    /// };
2357    ///
2358    /// let client = Client::new(homeserver).await?;
2359    /// client.matrix_auth().login_username(&username, &password).send().await?;
2360    ///
2361    /// // Register our handler so we start responding once we receive a new
2362    /// // event.
2363    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2364    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2365    /// });
2366    ///
2367    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2368    /// // automatically.
2369    /// client.sync(SyncSettings::default()).await?;
2370    /// # anyhow::Ok(()) };
2371    /// ```
2372    ///
2373    /// [argument docs]: #method.sync_once
2374    /// [`sync_with_callback`]: #method.sync_with_callback
2375    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2376        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2377    }
2378
2379    /// Repeatedly call sync to synchronize the client state with the server.
2380    ///
2381    /// # Arguments
2382    ///
2383    /// * `sync_settings` - Settings for the sync call. *Note* that those
2384    ///   settings will be only used for the first sync call. See the argument
2385    ///   docs for [`Client::sync_once`] for more info.
2386    ///
2387    /// * `callback` - A callback that will be called every time a successful
2388    ///   response has been fetched from the server. The callback must return a
2389    ///   boolean which signalizes if the method should stop syncing. If the
2390    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2391    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2392    ///
2393    /// # Return
2394    /// The sync runs until an error occurs or the
2395    /// callback indicates that the Loop should stop. If the callback asked for
2396    /// a regular stop, the result will be `Ok(())` otherwise the
2397    /// `Err(Error)` is returned.
2398    ///
2399    /// # Examples
2400    ///
2401    /// The following example demonstrates how to sync forever while sending all
2402    /// the interesting events through a mpsc channel to another thread e.g. a
2403    /// UI thread.
2404    ///
2405    /// ```no_run
2406    /// # use std::time::Duration;
2407    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2408    /// # use url::Url;
2409    /// # async {
2410    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2411    /// # let mut client = Client::new(homeserver).await.unwrap();
2412    ///
2413    /// use tokio::sync::mpsc::channel;
2414    ///
2415    /// let (tx, rx) = channel(100);
2416    ///
2417    /// let sync_channel = &tx;
2418    /// let sync_settings = SyncSettings::new()
2419    ///     .timeout(Duration::from_secs(30));
2420    ///
2421    /// client
2422    ///     .sync_with_callback(sync_settings, |response| async move {
2423    ///         let channel = sync_channel;
2424    ///         for (room_id, room) in response.rooms.joined {
2425    ///             for event in room.timeline.events {
2426    ///                 channel.send(event).await.unwrap();
2427    ///             }
2428    ///         }
2429    ///
2430    ///         LoopCtrl::Continue
2431    ///     })
2432    ///     .await;
2433    /// };
2434    /// ```
2435    #[instrument(skip_all)]
2436    pub async fn sync_with_callback<C>(
2437        &self,
2438        sync_settings: crate::config::SyncSettings,
2439        callback: impl Fn(SyncResponse) -> C,
2440    ) -> Result<(), Error>
2441    where
2442        C: Future<Output = LoopCtrl>,
2443    {
2444        self.sync_with_result_callback(sync_settings, |result| async {
2445            Ok(callback(result?).await)
2446        })
2447        .await
2448    }
2449
2450    /// Repeatedly call sync to synchronize the client state with the server.
2451    ///
2452    /// # Arguments
2453    ///
2454    /// * `sync_settings` - Settings for the sync call. *Note* that those
2455    ///   settings will be only used for the first sync call. See the argument
2456    ///   docs for [`Client::sync_once`] for more info.
2457    ///
2458    /// * `callback` - A callback that will be called every time after a
2459    ///   response has been received, failure or not. The callback returns a
2460    ///   `Result<LoopCtrl, Error>`, too. When returning
2461    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2462    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2463    ///   function returns `Ok(())`. In case the callback can't handle the
2464    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2465    ///   which results in the sync ending and the `Err(Error)` being returned.
2466    ///
2467    /// # Return
2468    /// The sync runs until an error occurs that the callback can't handle or
2469    /// the callback indicates that the Loop should stop. If the callback
2470    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2471    /// `Err(Error)` is returned.
2472    ///
2473    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2474    /// this, and are handled first without sending the result to the
2475    /// callback. Only after they have exceeded is the `Result` handed to
2476    /// the callback.
2477    ///
2478    /// # Examples
2479    ///
2480    /// The following example demonstrates how to sync forever while sending all
2481    /// the interesting events through a mpsc channel to another thread e.g. a
2482    /// UI thread.
2483    ///
2484    /// ```no_run
2485    /// # use std::time::Duration;
2486    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2487    /// # use url::Url;
2488    /// # async {
2489    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2490    /// # let mut client = Client::new(homeserver).await.unwrap();
2491    /// #
2492    /// use tokio::sync::mpsc::channel;
2493    ///
2494    /// let (tx, rx) = channel(100);
2495    ///
2496    /// let sync_channel = &tx;
2497    /// let sync_settings = SyncSettings::new()
2498    ///     .timeout(Duration::from_secs(30));
2499    ///
2500    /// client
2501    ///     .sync_with_result_callback(sync_settings, |response| async move {
2502    ///         let channel = sync_channel;
2503    ///         let sync_response = response?;
2504    ///         for (room_id, room) in sync_response.rooms.joined {
2505    ///              for event in room.timeline.events {
2506    ///                  channel.send(event).await.unwrap();
2507    ///               }
2508    ///         }
2509    ///
2510    ///         Ok(LoopCtrl::Continue)
2511    ///     })
2512    ///     .await;
2513    /// };
2514    /// ```
2515    #[instrument(skip(self, callback))]
2516    pub async fn sync_with_result_callback<C>(
2517        &self,
2518        mut sync_settings: crate::config::SyncSettings,
2519        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2520    ) -> Result<(), Error>
2521    where
2522        C: Future<Output = Result<LoopCtrl, Error>>,
2523    {
2524        let mut last_sync_time: Option<Instant> = None;
2525
2526        if sync_settings.token.is_none() {
2527            sync_settings.token = self.sync_token().await;
2528        }
2529
2530        loop {
2531            trace!("Syncing");
2532            let result = self.sync_loop_helper(&mut sync_settings).await;
2533
2534            trace!("Running callback");
2535            if callback(result).await? == LoopCtrl::Break {
2536                trace!("Callback told us to stop");
2537                break;
2538            }
2539            trace!("Done running callback");
2540
2541            Client::delay_sync(&mut last_sync_time).await
2542        }
2543
2544        Ok(())
2545    }
2546
2547    //// Repeatedly synchronize the client state with the server.
2548    ///
2549    /// This method will internally call [`Client::sync_once`] in a loop and is
2550    /// equivalent to the [`Client::sync`] method but the responses are provided
2551    /// as an async stream.
2552    ///
2553    /// # Arguments
2554    ///
2555    /// * `sync_settings` - Settings for the sync call. *Note* that those
2556    ///   settings will be only used for the first sync call. See the argument
2557    ///   docs for [`Client::sync_once`] for more info.
2558    ///
2559    /// # Examples
2560    ///
2561    /// ```no_run
2562    /// # use url::Url;
2563    /// # async {
2564    /// # let homeserver = Url::parse("http://localhost:8080")?;
2565    /// # let username = "";
2566    /// # let password = "";
2567    /// use futures_util::StreamExt;
2568    /// use matrix_sdk::{config::SyncSettings, Client};
2569    ///
2570    /// let client = Client::new(homeserver).await?;
2571    /// client.matrix_auth().login_username(&username, &password).send().await?;
2572    ///
2573    /// let mut sync_stream =
2574    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2575    ///
2576    /// while let Some(Ok(response)) = sync_stream.next().await {
2577    ///     for room in response.rooms.joined.values() {
2578    ///         for e in &room.timeline.events {
2579    ///             if let Ok(event) = e.raw().deserialize() {
2580    ///                 println!("Received event {:?}", event);
2581    ///             }
2582    ///         }
2583    ///     }
2584    /// }
2585    ///
2586    /// # anyhow::Ok(()) };
2587    /// ```
2588    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2589    #[instrument(skip(self))]
2590    pub async fn sync_stream(
2591        &self,
2592        mut sync_settings: crate::config::SyncSettings,
2593    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2594        let mut last_sync_time: Option<Instant> = None;
2595
2596        if sync_settings.token.is_none() {
2597            sync_settings.token = self.sync_token().await;
2598        }
2599
2600        let parent_span = Span::current();
2601
2602        async_stream::stream! {
2603            loop {
2604                yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2605
2606                Client::delay_sync(&mut last_sync_time).await
2607            }
2608        }
2609    }
2610
2611    /// Get the current, if any, sync token of the client.
2612    /// This will be None if the client didn't sync at least once.
2613    pub(crate) async fn sync_token(&self) -> Option<String> {
2614        self.inner.base_client.sync_token().await
2615    }
2616
2617    /// Gets information about the owner of a given access token.
2618    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2619        let request = whoami::v3::Request::new();
2620        self.send(request).await
2621    }
2622
2623    /// Subscribes a new receiver to client SessionChange broadcasts.
2624    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2625        let broadcast = &self.auth_ctx().session_change_sender;
2626        broadcast.subscribe()
2627    }
2628
2629    /// Sets the save/restore session callbacks.
2630    ///
2631    /// This is another mechanism to get synchronous updates to session tokens,
2632    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2633    pub fn set_session_callbacks(
2634        &self,
2635        reload_session_callback: Box<ReloadSessionCallback>,
2636        save_session_callback: Box<SaveSessionCallback>,
2637    ) -> Result<()> {
2638        self.inner
2639            .auth_ctx
2640            .reload_session_callback
2641            .set(reload_session_callback)
2642            .map_err(|_| Error::MultipleSessionCallbacks)?;
2643
2644        self.inner
2645            .auth_ctx
2646            .save_session_callback
2647            .set(save_session_callback)
2648            .map_err(|_| Error::MultipleSessionCallbacks)?;
2649
2650        Ok(())
2651    }
2652
2653    /// Get the notification settings of the current owner of the client.
2654    pub async fn notification_settings(&self) -> NotificationSettings {
2655        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2656        NotificationSettings::new(self.clone(), ruleset)
2657    }
2658
2659    /// Create a new specialized `Client` that can process notifications.
2660    ///
2661    /// See [`CrossProcessStoreLock::new`] to learn more about
2662    /// `cross_process_store_locks_holder_name`.
2663    ///
2664    /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2665    pub async fn notification_client(
2666        &self,
2667        cross_process_store_locks_holder_name: String,
2668    ) -> Result<Client> {
2669        let client = Client {
2670            inner: ClientInner::new(
2671                self.inner.auth_ctx.clone(),
2672                self.server().cloned(),
2673                self.homeserver(),
2674                self.sliding_sync_version(),
2675                self.inner.http_client.clone(),
2676                self.inner
2677                    .base_client
2678                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2679                    .await?,
2680                self.inner.caches.server_info.read().await.clone(),
2681                self.inner.respect_login_well_known,
2682                self.inner.event_cache.clone(),
2683                self.inner.send_queue_data.clone(),
2684                self.inner.latest_events.clone(),
2685                #[cfg(feature = "e2e-encryption")]
2686                self.inner.e2ee.encryption_settings,
2687                #[cfg(feature = "e2e-encryption")]
2688                self.inner.enable_share_history_on_invite,
2689                cross_process_store_locks_holder_name,
2690            )
2691            .await,
2692        };
2693
2694        Ok(client)
2695    }
2696
2697    /// The [`EventCache`] instance for this [`Client`].
2698    pub fn event_cache(&self) -> &EventCache {
2699        // SAFETY: always initialized in the `Client` ctor.
2700        self.inner.event_cache.get().unwrap()
2701    }
2702
2703    /// The [`LatestEvents`] instance for this [`Client`].
2704    pub async fn latest_events(&self) -> &LatestEvents {
2705        self.inner
2706            .latest_events
2707            .get_or_init(|| async {
2708                LatestEvents::new(
2709                    WeakClient::from_client(self),
2710                    self.event_cache().clone(),
2711                    SendQueue::new(self.clone()),
2712                )
2713            })
2714            .await
2715    }
2716
2717    /// Waits until an at least partially synced room is received, and returns
2718    /// it.
2719    ///
2720    /// **Note: this function will loop endlessly until either it finds the room
2721    /// or an externally set timeout happens.**
2722    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2723        loop {
2724            if let Some(room) = self.get_room(room_id) {
2725                if room.is_state_partially_or_fully_synced() {
2726                    debug!("Found just created room!");
2727                    return room;
2728                }
2729                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2730            } else {
2731                debug!("Room wasn't found, waiting for sync beat to try again");
2732            }
2733            self.inner.sync_beat.listen().await;
2734        }
2735    }
2736
2737    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2738    /// join it.
2739    pub async fn knock(
2740        &self,
2741        room_id_or_alias: OwnedRoomOrAliasId,
2742        reason: Option<String>,
2743        server_names: Vec<OwnedServerName>,
2744    ) -> Result<Room> {
2745        let request =
2746            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2747        let response = self.send(request).await?;
2748        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2749        Ok(Room::new(self.clone(), base_room))
2750    }
2751
2752    /// Checks whether the provided `user_id` belongs to an ignored user.
2753    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2754        self.base_client().is_user_ignored(user_id).await
2755    }
2756
2757    /// Gets the `max_upload_size` value from the homeserver, getting either a
2758    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2759    /// missing.
2760    ///
2761    /// Check the spec for more info:
2762    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2763    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2764        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2765        if let Some(data) = max_upload_size_lock.get() {
2766            return Ok(data.to_owned());
2767        }
2768
2769        let response = self
2770            .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2771            .await?;
2772
2773        match max_upload_size_lock.set(response.upload_size) {
2774            Ok(_) => Ok(response.upload_size),
2775            Err(error) => {
2776                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2777            }
2778        }
2779    }
2780
2781    /// The settings to use for decrypting events.
2782    #[cfg(feature = "e2e-encryption")]
2783    pub fn decryption_settings(&self) -> &DecryptionSettings {
2784        &self.base_client().decryption_settings
2785    }
2786}
2787
2788#[cfg(any(feature = "testing", test))]
2789impl Client {
2790    /// Test helper to mark users as tracked by the crypto layer.
2791    #[cfg(feature = "e2e-encryption")]
2792    pub async fn update_tracked_users_for_testing(
2793        &self,
2794        user_ids: impl IntoIterator<Item = &UserId>,
2795    ) {
2796        let olm = self.olm_machine().await;
2797        let olm = olm.as_ref().unwrap();
2798        olm.update_tracked_users(user_ids).await.unwrap();
2799    }
2800}
2801
2802/// A weak reference to the inner client, useful when trying to get a handle
2803/// on the owning client.
2804#[derive(Clone, Debug)]
2805pub(crate) struct WeakClient {
2806    client: Weak<ClientInner>,
2807}
2808
2809impl WeakClient {
2810    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2811    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2812        Self { client: Arc::downgrade(client) }
2813    }
2814
2815    /// Construct a [`WeakClient`] from a [`Client`].
2816    pub fn from_client(client: &Client) -> Self {
2817        Self::from_inner(&client.inner)
2818    }
2819
2820    /// Attempts to get a [`Client`] from this [`WeakClient`].
2821    pub fn get(&self) -> Option<Client> {
2822        self.client.upgrade().map(|inner| Client { inner })
2823    }
2824
2825    /// Gets the number of strong (`Arc`) pointers still pointing to this
2826    /// client.
2827    #[allow(dead_code)]
2828    pub fn strong_count(&self) -> usize {
2829        self.client.strong_count()
2830    }
2831}
2832
2833#[derive(Clone)]
2834struct ClientServerInfo {
2835    /// The Matrix versions and unstable features the server supports (known
2836    /// ones only).
2837    supported_versions: CachedValue<SupportedVersions>,
2838
2839    /// The server's well-known file, if any.
2840    well_known: CachedValue<Option<WellKnownResponse>>,
2841}
2842
2843/// A cached value that can either be set or not set, used to avoid confusion
2844/// between a value that is set to `None` (because it doesn't exist) and a value
2845/// that has not been cached yet.
2846#[derive(Clone)]
2847enum CachedValue<Value> {
2848    /// A value has been cached.
2849    Cached(Value),
2850    /// Nothing has been cached yet.
2851    NotSet,
2852}
2853
2854impl<Value> CachedValue<Value> {
2855    /// Unwraps the cached value, returning it if it exists.
2856    ///
2857    /// # Panics
2858    ///
2859    /// If the cached value is not set, this will panic.
2860    fn unwrap_cached_value(self) -> Value {
2861        match self {
2862            CachedValue::Cached(value) => value,
2863            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2864        }
2865    }
2866
2867    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
2868    fn as_ref(&self) -> CachedValue<&Value> {
2869        match self {
2870            Self::Cached(value) => CachedValue::Cached(value),
2871            Self::NotSet => CachedValue::NotSet,
2872        }
2873    }
2874
2875    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
2876    /// function to a contained value (if `Cached`) or returns `NotSet` (if
2877    /// `NotSet`).
2878    fn map<Other, F>(self, f: F) -> CachedValue<Other>
2879    where
2880        F: FnOnce(Value) -> Other,
2881    {
2882        match self {
2883            Self::Cached(value) => CachedValue::Cached(f(value)),
2884            Self::NotSet => CachedValue::NotSet,
2885        }
2886    }
2887}
2888
2889/// Information about the state of a room before we joined it.
2890#[derive(Debug, Clone, Default)]
2891struct PreJoinRoomInfo {
2892    /// The user who invited us to the room, if any.
2893    pub inviter: Option<RoomMember>,
2894}
2895
2896// The http mocking library is not supported for wasm32
2897#[cfg(all(test, not(target_family = "wasm")))]
2898pub(crate) mod tests {
2899    use std::{sync::Arc, time::Duration};
2900
2901    use assert_matches::assert_matches;
2902    use assert_matches2::assert_let;
2903    use eyeball::SharedObservable;
2904    use futures_util::{pin_mut, FutureExt};
2905    use js_int::{uint, UInt};
2906    use matrix_sdk_base::{
2907        store::{MemoryStore, StoreConfig},
2908        RoomState,
2909    };
2910    use matrix_sdk_test::{
2911        async_test, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2912        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2913    };
2914    #[cfg(target_family = "wasm")]
2915    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2916
2917    use ruma::{
2918        api::{
2919            client::{
2920                discovery::discover_homeserver::RtcFocusInfo,
2921                room::create_room::v3::Request as CreateRoomRequest,
2922            },
2923            FeatureFlag, MatrixVersion,
2924        },
2925        assign,
2926        events::{
2927            ignored_user_list::IgnoredUserListEventContent,
2928            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2929        },
2930        owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2931    };
2932    use serde_json::json;
2933    use stream_assert::{assert_next_matches, assert_pending};
2934    use tokio::{
2935        spawn,
2936        time::{sleep, timeout},
2937    };
2938    use url::Url;
2939
2940    use super::Client;
2941    use crate::{
2942        client::{futures::SendMediaUploadRequest, WeakClient},
2943        config::RequestConfig,
2944        futures::SendRequest,
2945        media::MediaError,
2946        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
2947        Error, TransmissionProgress,
2948    };
2949
2950    #[async_test]
2951    async fn test_account_data() {
2952        let server = MatrixMockServer::new().await;
2953        let client = server.client_builder().build().await;
2954
2955        server
2956            .mock_sync()
2957            .ok_and_run(&client, |builder| {
2958                builder.add_global_account_data_event(GlobalAccountDataTestEvent::IgnoredUserList);
2959            })
2960            .await;
2961
2962        let content = client
2963            .account()
2964            .account_data::<IgnoredUserListEventContent>()
2965            .await
2966            .unwrap()
2967            .unwrap()
2968            .deserialize()
2969            .unwrap();
2970
2971        assert_eq!(content.ignored_users.len(), 1);
2972    }
2973
2974    #[async_test]
2975    async fn test_successful_discovery() {
2976        // Imagine this is `matrix.org`.
2977        let server = MatrixMockServer::new().await;
2978        let server_url = server.uri();
2979
2980        // Imagine this is `matrix-client.matrix.org`.
2981        let homeserver = MatrixMockServer::new().await;
2982        let homeserver_url = homeserver.uri();
2983
2984        // Imagine Alice has the user ID `@alice:matrix.org`.
2985        let domain = server_url.strip_prefix("http://").unwrap();
2986        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2987
2988        // The `.well-known` is on the server (e.g. `matrix.org`).
2989        server
2990            .mock_well_known()
2991            .ok_with_homeserver_url(&homeserver_url)
2992            .mock_once()
2993            .named("well-known")
2994            .mount()
2995            .await;
2996
2997        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2998        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
2999
3000        let client = Client::builder()
3001            .insecure_server_name_no_tls(alice.server_name())
3002            .build()
3003            .await
3004            .unwrap();
3005
3006        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3007        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3008        client.server_versions().await.unwrap();
3009    }
3010
3011    #[async_test]
3012    async fn test_discovery_broken_server() {
3013        let server = MatrixMockServer::new().await;
3014        let server_url = server.uri();
3015        let domain = server_url.strip_prefix("http://").unwrap();
3016        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3017
3018        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3019
3020        assert!(
3021            Client::builder()
3022                .insecure_server_name_no_tls(alice.server_name())
3023                .build()
3024                .await
3025                .is_err(),
3026            "Creating a client from a user ID should fail when the .well-known request fails."
3027        );
3028    }
3029
3030    #[async_test]
3031    async fn test_room_creation() {
3032        let server = MatrixMockServer::new().await;
3033        let client = server.client_builder().build().await;
3034
3035        server
3036            .mock_sync()
3037            .ok_and_run(&client, |builder| {
3038                builder.add_joined_room(
3039                    JoinedRoomBuilder::default()
3040                        .add_state_event(StateTestEvent::Member)
3041                        .add_state_event(StateTestEvent::PowerLevels),
3042                );
3043            })
3044            .await;
3045
3046        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3047        assert_eq!(room.state(), RoomState::Joined);
3048    }
3049
3050    #[async_test]
3051    async fn test_retry_limit_http_requests() {
3052        let server = MatrixMockServer::new().await;
3053        let client = server
3054            .client_builder()
3055            .request_config(RequestConfig::new().retry_limit(3))
3056            .build()
3057            .await;
3058
3059        assert!(client.request_config().retry_limit.unwrap() == 3);
3060
3061        server.mock_login().error500().expect(3).mount().await;
3062
3063        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3064    }
3065
3066    #[async_test]
3067    async fn test_retry_timeout_http_requests() {
3068        // Keep this timeout small so that the test doesn't take long
3069        let retry_timeout = Duration::from_secs(5);
3070        let server = MatrixMockServer::new().await;
3071        let client = server
3072            .client_builder()
3073            .request_config(RequestConfig::new().max_retry_time(retry_timeout))
3074            .build()
3075            .await;
3076
3077        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3078
3079        server.mock_login().error500().expect(2..).mount().await;
3080
3081        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3082    }
3083
3084    #[async_test]
3085    async fn test_short_retry_initial_http_requests() {
3086        let server = MatrixMockServer::new().await;
3087        let client =
3088            server.client_builder().request_config(RequestConfig::short_retry()).build().await;
3089
3090        server.mock_login().error500().expect(3..).mount().await;
3091
3092        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3093    }
3094
3095    #[async_test]
3096    async fn test_no_retry_http_requests() {
3097        let server = MatrixMockServer::new().await;
3098        let client = server.client_builder().build().await;
3099
3100        server.mock_devices().error500().mock_once().mount().await;
3101
3102        client.devices().await.unwrap_err();
3103    }
3104
3105    #[async_test]
3106    async fn test_set_homeserver() {
3107        let client = MockClientBuilder::new(None).build().await;
3108        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3109
3110        let homeserver = Url::parse("http://example.com/").unwrap();
3111        client.set_homeserver(homeserver.clone());
3112        assert_eq!(client.homeserver(), homeserver);
3113    }
3114
3115    #[async_test]
3116    async fn test_search_user_request() {
3117        let server = MatrixMockServer::new().await;
3118        let client = server.client_builder().build().await;
3119
3120        server.mock_user_directory().ok().mock_once().mount().await;
3121
3122        let response = client.search_users("test", 50).await.unwrap();
3123        assert_eq!(response.results.len(), 1);
3124        let result = &response.results[0];
3125        assert_eq!(result.user_id.to_string(), "@test:example.me");
3126        assert_eq!(result.display_name.clone().unwrap(), "Test");
3127        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3128        assert!(!response.limited);
3129    }
3130
3131    #[async_test]
3132    async fn test_request_unstable_features() {
3133        let server = MatrixMockServer::new().await;
3134        let client = server.client_builder().no_server_versions().build().await;
3135
3136        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3137
3138        let unstable_features = client.unstable_features().await.unwrap();
3139        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3140        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3141    }
3142
3143    #[async_test]
3144    async fn test_can_homeserver_push_encrypted_event_to_device() {
3145        let server = MatrixMockServer::new().await;
3146        let client = server.client_builder().no_server_versions().build().await;
3147
3148        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3149
3150        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3151        assert!(msc4028_enabled);
3152    }
3153
3154    #[async_test]
3155    async fn test_recently_visited_rooms() {
3156        // Tracking recently visited rooms requires authentication
3157        let client = MockClientBuilder::new(None).unlogged().build().await;
3158        assert_matches!(
3159            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3160            Err(Error::AuthenticationRequired)
3161        );
3162
3163        let client = MockClientBuilder::new(None).build().await;
3164        let account = client.account();
3165
3166        // We should start off with an empty list
3167        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3168
3169        // Tracking a valid room id should add it to the list
3170        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3171        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3172        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3173
3174        // And the existing list shouldn't be changed
3175        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3176        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3177
3178        // Tracking the same room again shouldn't change the list
3179        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3180        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3181        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3182
3183        // Tracking a second room should add it to the front of the list
3184        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3185        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3186        assert_eq!(
3187            account.get_recently_visited_rooms().await.unwrap(),
3188            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3189        );
3190
3191        // Tracking the first room yet again should move it to the front of the list
3192        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3193        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3194        assert_eq!(
3195            account.get_recently_visited_rooms().await.unwrap(),
3196            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3197        );
3198
3199        // Tracking should be capped at 20
3200        for n in 0..20 {
3201            account
3202                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3203                .await
3204                .unwrap();
3205        }
3206
3207        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3208
3209        // And the initial rooms should've been pushed out
3210        let rooms = account.get_recently_visited_rooms().await.unwrap();
3211        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3212        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3213
3214        // And the last tracked room should be the first
3215        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3216    }
3217
3218    #[async_test]
3219    async fn test_client_no_cycle_with_event_cache() {
3220        let client = MockClientBuilder::new(None).build().await;
3221
3222        // Wait for the init tasks to die.
3223        sleep(Duration::from_secs(1)).await;
3224
3225        let weak_client = WeakClient::from_client(&client);
3226        assert_eq!(weak_client.strong_count(), 1);
3227
3228        {
3229            let room_id = room_id!("!room:example.org");
3230
3231            // Have the client know the room.
3232            let response = SyncResponseBuilder::default()
3233                .add_joined_room(JoinedRoomBuilder::new(room_id))
3234                .build_sync_response();
3235            client.inner.base_client.receive_sync_response(response).await.unwrap();
3236
3237            client.event_cache().subscribe().unwrap();
3238
3239            let (_room_event_cache, _drop_handles) =
3240                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3241        }
3242
3243        drop(client);
3244
3245        // Give a bit of time for background tasks to die.
3246        sleep(Duration::from_secs(1)).await;
3247
3248        // The weak client must be the last reference to the client now.
3249        assert_eq!(weak_client.strong_count(), 0);
3250        let client = weak_client.get();
3251        assert!(
3252            client.is_none(),
3253            "too many strong references to the client: {}",
3254            Arc::strong_count(&client.unwrap().inner)
3255        );
3256    }
3257
3258    #[async_test]
3259    async fn test_server_info_caching() {
3260        let server = MatrixMockServer::new().await;
3261        let server_url = server.uri();
3262        let domain = server_url.strip_prefix("http://").unwrap();
3263        let server_name = <&ServerName>::try_from(domain).unwrap();
3264        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3265
3266        let well_known_mock = server
3267            .mock_well_known()
3268            .ok()
3269            .named("well known mock")
3270            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3271            .mount_as_scoped()
3272            .await;
3273
3274        let versions_mock = server
3275            .mock_versions()
3276            .ok_with_unstable_features()
3277            .named("first versions mock")
3278            .expect(1)
3279            .mount_as_scoped()
3280            .await;
3281
3282        let memory_store = Arc::new(MemoryStore::new());
3283        let client = Client::builder()
3284            .insecure_server_name_no_tls(server_name)
3285            .store_config(
3286                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3287                    .state_store(memory_store.clone()),
3288            )
3289            .build()
3290            .await
3291            .unwrap();
3292
3293        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3294
3295        // These subsequent calls hit the in-memory cache.
3296        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3297        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3298
3299        drop(client);
3300
3301        let client = server
3302            .client_builder()
3303            .no_server_versions()
3304            .store_config(
3305                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3306                    .state_store(memory_store.clone()),
3307            )
3308            .build()
3309            .await;
3310
3311        // These calls to the new client hit the on-disk cache.
3312        assert!(client
3313            .unstable_features()
3314            .await
3315            .unwrap()
3316            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3317        let supported = client.supported_versions().await.unwrap();
3318        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3319        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3320
3321        // Then this call hits the in-memory cache.
3322        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3323
3324        drop(versions_mock);
3325        drop(well_known_mock);
3326
3327        // Now, reset the cache, and observe the endpoints being called again once.
3328        client.reset_server_info().await.unwrap();
3329
3330        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3331
3332        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3333
3334        // Hits network again.
3335        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3336        // Hits in-memory cache again.
3337        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3338        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3339    }
3340
3341    #[async_test]
3342    async fn test_server_info_without_a_well_known() {
3343        let server = MatrixMockServer::new().await;
3344        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3345
3346        let versions_mock = server
3347            .mock_versions()
3348            .ok_with_unstable_features()
3349            .named("first versions mock")
3350            .expect(1)
3351            .mount_as_scoped()
3352            .await;
3353
3354        let memory_store = Arc::new(MemoryStore::new());
3355        let client = server
3356            .client_builder()
3357            .no_server_versions()
3358            .store_config(
3359                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3360                    .state_store(memory_store.clone()),
3361            )
3362            .build()
3363            .await;
3364
3365        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3366
3367        // These subsequent calls hit the in-memory cache.
3368        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3369        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3370
3371        drop(client);
3372
3373        let client = server
3374            .client_builder()
3375            .no_server_versions()
3376            .store_config(
3377                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3378                    .state_store(memory_store.clone()),
3379            )
3380            .build()
3381            .await;
3382
3383        // This call to the new client hits the on-disk cache.
3384        assert!(client
3385            .unstable_features()
3386            .await
3387            .unwrap()
3388            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3389
3390        // Then this call hits the in-memory cache.
3391        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3392
3393        drop(versions_mock);
3394
3395        // Now, reset the cache, and observe the endpoints being called again once.
3396        client.reset_server_info().await.unwrap();
3397
3398        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3399
3400        // Hits network again.
3401        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3402        // Hits in-memory cache again.
3403        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3404        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3405    }
3406
3407    #[async_test]
3408    async fn test_no_network_doesnt_cause_infinite_retries() {
3409        // We want infinite retries for transient errors.
3410        let client =
3411            MockClientBuilder::new(None).request_config(RequestConfig::new()).build().await;
3412
3413        // We don't define a mock server on purpose here, so that the error is really a
3414        // network error.
3415        client.whoami().await.unwrap_err();
3416    }
3417
3418    #[async_test]
3419    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3420        let server = MatrixMockServer::new().await;
3421        let client = server.client_builder().build().await;
3422
3423        let room_id = room_id!("!room:example.org");
3424
3425        server
3426            .mock_sync()
3427            .ok_and_run(&client, |builder| {
3428                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3429            })
3430            .await;
3431
3432        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3433        assert_eq!(room.room_id(), room_id);
3434    }
3435
3436    #[async_test]
3437    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3438        let server = MatrixMockServer::new().await;
3439        let client = server.client_builder().build().await;
3440
3441        let room_id = room_id!("!room:example.org");
3442
3443        let client = Arc::new(client);
3444
3445        // Perform the /sync request with a delay so it starts after the
3446        // `await_room_remote_echo` call has happened
3447        spawn({
3448            let client = client.clone();
3449            async move {
3450                sleep(Duration::from_millis(100)).await;
3451
3452                server
3453                    .mock_sync()
3454                    .ok_and_run(&client, |builder| {
3455                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3456                    })
3457                    .await;
3458            }
3459        });
3460
3461        let room =
3462            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3463        assert_eq!(room.room_id(), room_id);
3464    }
3465
3466    #[async_test]
3467    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3468        let client = MockClientBuilder::new(None).build().await;
3469
3470        let room_id = room_id!("!room:example.org");
3471        // Room is not present so the client won't be able to find it. The call will
3472        // timeout.
3473        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3474    }
3475
3476    #[async_test]
3477    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3478        let server = MatrixMockServer::new().await;
3479        let client = server.client_builder().build().await;
3480
3481        server.mock_create_room().ok().mount().await;
3482
3483        // Create a room in the internal store
3484        let room = client
3485            .create_room(assign!(CreateRoomRequest::new(), {
3486                invite: vec![],
3487                is_direct: false,
3488            }))
3489            .await
3490            .unwrap();
3491
3492        // Room is locally present, but not synced, the call will timeout
3493        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3494            .await
3495            .unwrap_err();
3496    }
3497
3498    #[async_test]
3499    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3500        let server = MatrixMockServer::new().await;
3501        let client = server.client_builder().build().await;
3502
3503        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3504
3505        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3506        assert_matches!(ret, Ok(true));
3507    }
3508
3509    #[async_test]
3510    async fn test_is_room_alias_available_if_alias_is_resolved() {
3511        let server = MatrixMockServer::new().await;
3512        let client = server.client_builder().build().await;
3513
3514        server
3515            .mock_room_directory_resolve_alias()
3516            .ok("!some_room_id:matrix.org", Vec::new())
3517            .expect(1)
3518            .mount()
3519            .await;
3520
3521        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3522        assert_matches!(ret, Ok(false));
3523    }
3524
3525    #[async_test]
3526    async fn test_is_room_alias_available_if_error_found() {
3527        let server = MatrixMockServer::new().await;
3528        let client = server.client_builder().build().await;
3529
3530        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3531
3532        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3533        assert_matches!(ret, Err(_));
3534    }
3535
3536    #[async_test]
3537    async fn test_create_room_alias() {
3538        let server = MatrixMockServer::new().await;
3539        let client = server.client_builder().build().await;
3540
3541        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3542
3543        let ret = client
3544            .create_room_alias(
3545                room_alias_id!("#some_alias:matrix.org"),
3546                room_id!("!some_room:matrix.org"),
3547            )
3548            .await;
3549        assert_matches!(ret, Ok(()));
3550    }
3551
3552    #[async_test]
3553    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3554        let server = MatrixMockServer::new().await;
3555        let client = server.client_builder().build().await;
3556
3557        let room_id = room_id!("!a-room:matrix.org");
3558
3559        // Make sure the summary endpoint is called once
3560        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3561
3562        // We create a locally cached invited room
3563        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3564
3565        // And we get a preview, the server endpoint was reached
3566        let preview = client
3567            .get_room_preview(room_id.into(), Vec::new())
3568            .await
3569            .expect("Room preview should be retrieved");
3570
3571        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3572    }
3573
3574    #[async_test]
3575    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3576        let server = MatrixMockServer::new().await;
3577        let client = server.client_builder().build().await;
3578
3579        let room_id = room_id!("!a-room:matrix.org");
3580
3581        // Make sure the summary endpoint is called once
3582        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3583
3584        // We create a locally cached left room
3585        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3586
3587        // And we get a preview, the server endpoint was reached
3588        let preview = client
3589            .get_room_preview(room_id.into(), Vec::new())
3590            .await
3591            .expect("Room preview should be retrieved");
3592
3593        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3594    }
3595
3596    #[async_test]
3597    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3598        let server = MatrixMockServer::new().await;
3599        let client = server.client_builder().build().await;
3600
3601        let room_id = room_id!("!a-room:matrix.org");
3602
3603        // Make sure the summary endpoint is called once
3604        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3605
3606        // We create a locally cached knocked room
3607        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3608
3609        // And we get a preview, the server endpoint was reached
3610        let preview = client
3611            .get_room_preview(room_id.into(), Vec::new())
3612            .await
3613            .expect("Room preview should be retrieved");
3614
3615        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3616    }
3617
3618    #[async_test]
3619    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3620        let server = MatrixMockServer::new().await;
3621        let client = server.client_builder().build().await;
3622
3623        let room_id = room_id!("!a-room:matrix.org");
3624
3625        // Make sure the summary endpoint is not called
3626        server.mock_room_summary().ok(room_id).never().mount().await;
3627
3628        // We create a locally cached joined room
3629        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3630
3631        // And we get a preview, no server endpoint was reached
3632        let preview = client
3633            .get_room_preview(room_id.into(), Vec::new())
3634            .await
3635            .expect("Room preview should be retrieved");
3636
3637        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3638    }
3639
3640    #[async_test]
3641    async fn test_media_preview_config() {
3642        let server = MatrixMockServer::new().await;
3643        let client = server.client_builder().build().await;
3644
3645        server
3646            .mock_sync()
3647            .ok_and_run(&client, |builder| {
3648                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3649                    "content": {
3650                        "media_previews": "private",
3651                        "invite_avatars": "off"
3652                    },
3653                    "type": "m.media_preview_config"
3654                })));
3655            })
3656            .await;
3657
3658        let (initial_value, stream) =
3659            client.account().observe_media_preview_config().await.unwrap();
3660
3661        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3662        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3663        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3664        pin_mut!(stream);
3665        assert_pending!(stream);
3666
3667        server
3668            .mock_sync()
3669            .ok_and_run(&client, |builder| {
3670                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3671                    "content": {
3672                        "media_previews": "off",
3673                        "invite_avatars": "on"
3674                    },
3675                    "type": "m.media_preview_config"
3676                })));
3677            })
3678            .await;
3679
3680        assert_next_matches!(
3681            stream,
3682            MediaPreviewConfigEventContent {
3683                media_previews: MediaPreviews::Off,
3684                invite_avatars: InviteAvatars::On,
3685                ..
3686            }
3687        );
3688        assert_pending!(stream);
3689    }
3690
3691    #[async_test]
3692    async fn test_unstable_media_preview_config() {
3693        let server = MatrixMockServer::new().await;
3694        let client = server.client_builder().build().await;
3695
3696        server
3697            .mock_sync()
3698            .ok_and_run(&client, |builder| {
3699                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3700                    "content": {
3701                        "media_previews": "private",
3702                        "invite_avatars": "off"
3703                    },
3704                    "type": "io.element.msc4278.media_preview_config"
3705                })));
3706            })
3707            .await;
3708
3709        let (initial_value, stream) =
3710            client.account().observe_media_preview_config().await.unwrap();
3711
3712        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3713        assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3714        assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3715        pin_mut!(stream);
3716        assert_pending!(stream);
3717
3718        server
3719            .mock_sync()
3720            .ok_and_run(&client, |builder| {
3721                builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3722                    "content": {
3723                        "media_previews": "off",
3724                        "invite_avatars": "on"
3725                    },
3726                    "type": "io.element.msc4278.media_preview_config"
3727                })));
3728            })
3729            .await;
3730
3731        assert_next_matches!(
3732            stream,
3733            MediaPreviewConfigEventContent {
3734                media_previews: MediaPreviews::Off,
3735                invite_avatars: InviteAvatars::On,
3736                ..
3737            }
3738        );
3739        assert_pending!(stream);
3740    }
3741
3742    #[async_test]
3743    async fn test_media_preview_config_not_found() {
3744        let server = MatrixMockServer::new().await;
3745        let client = server.client_builder().build().await;
3746
3747        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3748
3749        assert!(initial_value.is_none());
3750    }
3751
3752    #[async_test]
3753    async fn test_load_or_fetch_max_upload_size() {
3754        let server = MatrixMockServer::new().await;
3755        let client = server.client_builder().build().await;
3756
3757        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3758
3759        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3760        client.load_or_fetch_max_upload_size().await.unwrap();
3761
3762        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3763    }
3764
3765    #[async_test]
3766    async fn test_uploading_a_too_large_media_file() {
3767        let server = MatrixMockServer::new().await;
3768        let client = server.client_builder().build().await;
3769
3770        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3771        client.load_or_fetch_max_upload_size().await.unwrap();
3772        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3773
3774        let data = vec![1, 2];
3775        let upload_request =
3776            ruma::api::client::media::create_content::v3::Request::new(data.clone());
3777        let request = SendRequest {
3778            client: client.clone(),
3779            request: upload_request,
3780            config: None,
3781            send_progress: SharedObservable::new(TransmissionProgress::default()),
3782        };
3783        let media_request = SendMediaUploadRequest::new(request);
3784
3785        let error = media_request.await.err();
3786        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3787        assert_eq!(max, uint!(1));
3788        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3789    }
3790}