Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36    SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37    SyncOutsideWasm, ThreadingSupport,
38    event_cache::store::EventCacheStoreLock,
39    media::store::MediaStoreLock,
40    store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{discover_homeserver, get_supported_versions},
59            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
60            knock::knock_room,
61            media,
62            membership::{join_room_by_id, join_room_by_id_or_alias},
63            room::create_room,
64            rtc::RtcTransport,
65            session::login::v3::DiscoveryInfo,
66            sync::sync_events,
67            threads::get_thread_subscriptions_changes,
68            uiaa,
69            user_directory::search_users,
70        },
71        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
72        path_builder::PathBuilder,
73    },
74    assign,
75    events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
76    push::Ruleset,
77    time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
82use url::Url;
83
84use self::{
85    caches::{Cache, CachedValue, ClientCaches},
86    futures::SendRequest,
87};
88use crate::{
89    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90    Room, SessionTokens, TransmissionProgress,
91    authentication::{
92        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93        oauth::OAuth,
94    },
95    client::{
96        homeserver_capabilities::HomeserverCapabilities,
97        thread_subscriptions::ThreadSubscriptionCatchup,
98    },
99    config::{RequestConfig, SyncToken},
100    deduplicating_handler::DeduplicatingHandler,
101    error::HttpResult,
102    event_cache::EventCache,
103    event_handler::{
104        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
105        EventHandlerStore, ObservableEventHandler, SyncEvent,
106    },
107    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
108    latest_events::LatestEvents,
109    live_locations_observer::BeaconInfoUpdate,
110    media::MediaError,
111    notification_settings::NotificationSettings,
112    room::RoomMember,
113    room_preview::RoomPreview,
114    send_queue::{SendQueue, SendQueueData},
115    sliding_sync::Version as SlidingSyncVersion,
116    sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120    cross_process_lock::CrossProcessLock,
121    encryption::{
122        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
123        VerificationState,
124    },
125};
126
127mod builder;
128pub(crate) mod caches;
129pub(crate) mod futures;
130pub(crate) mod homeserver_capabilities;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156    /// Continue running the loop.
157    Continue,
158    /// Break out of the loop.
159    Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165    /// The session's token is no longer valid.
166    UnknownToken(UnknownTokenErrorData),
167    /// The session's tokens have been refreshed.
168    TokensRefreshed,
169}
170
171/// Information about the server vendor obtained from the federation API.
172#[derive(Debug, Clone, PartialEq, Eq)]
173#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
174pub struct ServerVendorInfo {
175    /// The server name.
176    pub server_name: String,
177    /// The server version.
178    pub version: String,
179}
180
181/// Information about a map tile server advertised by the homeserver through the
182/// `tile_server` field of the matrix client well-known (MSC3488).
183#[derive(Debug, Clone, PartialEq, Eq, Hash)]
184#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
185pub struct TileServerInfo {
186    /// The URL of a map tile server's `style.json` file. See the
187    /// [Mapbox Style Specification](https://docs.mapbox.com/mapbox-gl-js/style-spec/)
188    /// for more details.
189    pub map_style_url: String,
190}
191
192impl From<discover_homeserver::TileServerInfo> for TileServerInfo {
193    fn from(value: discover_homeserver::TileServerInfo) -> Self {
194        Self { map_style_url: value.map_style_url }
195    }
196}
197
198/// An async/await enabled Matrix client.
199///
200/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
201#[derive(Clone)]
202pub struct Client {
203    pub(crate) inner: Arc<ClientInner>,
204}
205
206#[derive(Default)]
207pub(crate) struct ClientLocks {
208    /// Lock ensuring that only a single room may be marked as a DM at once.
209    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
210    /// explanation.
211    pub(crate) mark_as_dm_lock: Mutex<()>,
212
213    /// Lock ensuring that only a single secret store is getting opened at the
214    /// same time.
215    ///
216    /// This is important so we don't accidentally create multiple different new
217    /// default secret storage keys.
218    #[cfg(feature = "e2e-encryption")]
219    pub(crate) open_secret_store_lock: Mutex<()>,
220
221    /// Lock ensuring that we're only storing a single secret at a time.
222    ///
223    /// Take a look at the [`SecretStore::put_secret`] method for a more
224    /// detailed explanation.
225    ///
226    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) store_secret_lock: Mutex<()>,
229
230    /// Lock ensuring that only one method at a time might modify our backup.
231    #[cfg(feature = "e2e-encryption")]
232    pub(crate) backup_modify_lock: Mutex<()>,
233
234    /// Lock ensuring that we're going to attempt to upload backups for a single
235    /// requester.
236    #[cfg(feature = "e2e-encryption")]
237    pub(crate) backup_upload_lock: Mutex<()>,
238
239    /// Handler making sure we only have one group session sharing request in
240    /// flight per room.
241    #[cfg(feature = "e2e-encryption")]
242    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244    /// Lock making sure we're only doing one key claim request at a time.
245    #[cfg(feature = "e2e-encryption")]
246    pub(crate) key_claim_lock: Mutex<()>,
247
248    /// Handler to ensure that only one members request is running at a time,
249    /// given a room.
250    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
251
252    /// Handler to ensure that only one encryption state request is running at a
253    /// time, given a room.
254    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
255
256    /// Deduplicating handler for sending read receipts. The string is an
257    /// internal implementation detail, see [`Self::send_single_receipt`].
258    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
259
260    #[cfg(feature = "e2e-encryption")]
261    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
262
263    /// Latest "generation" of data known by the crypto store.
264    ///
265    /// This is a counter that only increments, set in the database (and can
266    /// wrap). It's incremented whenever some process acquires a lock for the
267    /// first time. *This assumes the crypto store lock is being held, to
268    /// avoid data races on writing to this value in the store*.
269    ///
270    /// The current process will maintain this value in local memory and in the
271    /// DB over time. Observing a different value than the one read in
272    /// memory, when reading from the store indicates that somebody else has
273    /// written into the database under our feet.
274    ///
275    /// TODO: this should live in the `OlmMachine`, since it's information
276    /// related to the lock. As of today (2023-07-28), we blow up the entire
277    /// olm machine when there's a generation mismatch. So storing the
278    /// generation in the olm machine would make the client think there's
279    /// *always* a mismatch, and that's why we need to store the generation
280    /// outside the `OlmMachine`.
281    #[cfg(feature = "e2e-encryption")]
282    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
283}
284
285pub(crate) struct ClientInner {
286    /// All the data related to authentication and authorization.
287    pub(crate) auth_ctx: Arc<AuthCtx>,
288
289    /// The URL of the server.
290    ///
291    /// Not to be confused with the `Self::homeserver`. `server` is usually
292    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
293    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
294    /// homeserver (at the time of writing — 2024-08-28).
295    ///
296    /// This value is optional depending on how the `Client` has been built.
297    /// If it's been built from a homeserver URL directly, we don't know the
298    /// server. However, if the `Client` has been built from a server URL or
299    /// name, then the homeserver has been discovered, and we know both.
300    server: Option<Url>,
301
302    /// The URL of the homeserver to connect to.
303    ///
304    /// This is the URL for the client-server Matrix API.
305    homeserver: StdRwLock<Url>,
306
307    /// The sliding sync version.
308    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
309
310    /// The underlying HTTP client.
311    pub(crate) http_client: HttpClient,
312
313    /// User session data.
314    pub(super) base_client: BaseClient,
315
316    /// Collection of in-memory caches for the [`Client`].
317    pub(crate) caches: ClientCaches,
318
319    /// Collection of locks individual client methods might want to use, either
320    /// to ensure that only a single call to a method happens at once or to
321    /// deduplicate multiple calls to a method.
322    pub(crate) locks: ClientLocks,
323
324    /// The cross-process lock configuration.
325    ///
326    /// The SDK provides cross-process store locks (see
327    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
328    /// [`CrossProcessLockConfig::MultiProcess`] is used.
329    ///
330    /// If multiple `Client`s are running in different processes, this
331    /// value MUST be different for each `Client`.
332    cross_process_lock_config: CrossProcessLockConfig,
333
334    /// A mapping of the times at which the current user sent typing notices,
335    /// keyed by room.
336    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
337
338    /// Event handlers. See `add_event_handler`.
339    pub(crate) event_handlers: EventHandlerStore,
340
341    /// Notification handlers. See `register_notification_handler`.
342    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
343
344    /// The sender-side of channels used to receive room updates.
345    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
346
347    /// The sender-side of a channel used to observe all the room updates of a
348    /// sync response.
349    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
350
351    /// Whether the client should update its homeserver URL with the discovery
352    /// information present in the login response.
353    respect_login_well_known: bool,
354
355    /// An event that can be listened on to wait for a successful sync. The
356    /// event will only be fired if a sync loop is running. Can be used for
357    /// synchronization, e.g. if we send out a request to create a room, we can
358    /// wait for the sync to get the data to fetch a room object from the state
359    /// store.
360    pub(crate) sync_beat: event_listener::Event,
361
362    /// A central cache for events, inactive first.
363    ///
364    /// It becomes active when [`EventCache::subscribe`] is called.
365    pub(crate) event_cache: OnceCell<EventCache>,
366
367    /// End-to-end encryption related state.
368    #[cfg(feature = "e2e-encryption")]
369    pub(crate) e2ee: EncryptionData,
370
371    /// The verification state of our own device.
372    #[cfg(feature = "e2e-encryption")]
373    pub(crate) verification_state: SharedObservable<VerificationState>,
374
375    /// Whether to enable the experimental support for sending and receiving
376    /// encrypted room history on invite, per [MSC4268].
377    ///
378    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
379    #[cfg(feature = "e2e-encryption")]
380    pub(crate) enable_share_history_on_invite: bool,
381
382    /// Data related to the [`SendQueue`].
383    ///
384    /// [`SendQueue`]: crate::send_queue::SendQueue
385    pub(crate) send_queue_data: Arc<SendQueueData>,
386
387    /// The `max_upload_size` value of the homeserver, it contains the max
388    /// request size you can send.
389    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
390
391    /// The entry point to get the [`LatestEvent`] of rooms and threads.
392    ///
393    /// [`LatestEvent`]: crate::latest_event::LatestEvent
394    latest_events: OnceCell<LatestEvents>,
395
396    /// Service handling the catching up of thread subscriptions in the
397    /// background.
398    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
399
400    #[cfg(feature = "experimental-search")]
401    /// Handler for [`RoomIndex`]'s of each room
402    search_index: SearchIndex,
403
404    /// A monitor for background tasks spawned by the client.
405    pub(crate) task_monitor: TaskMonitor,
406
407    /// A sender to notify subscribers about duplicate key upload errors
408    /// triggered by requests to /keys/upload.
409    #[cfg(feature = "e2e-encryption")]
410    pub(crate) duplicate_key_upload_error_sender:
411        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
412}
413
414impl ClientInner {
415    /// Create a new `ClientInner`.
416    ///
417    /// All the fields passed as parameters here are those that must be cloned
418    /// upon instantiation of a sub-client, e.g. a client specialized for
419    /// notifications.
420    #[allow(clippy::too_many_arguments)]
421    async fn new(
422        auth_ctx: Arc<AuthCtx>,
423        server: Option<Url>,
424        homeserver: Url,
425        sliding_sync_version: SlidingSyncVersion,
426        http_client: HttpClient,
427        base_client: BaseClient,
428        supported_versions: CachedValue<TtlValue<SupportedVersions>>,
429        well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
430        respect_login_well_known: bool,
431        event_cache: OnceCell<EventCache>,
432        send_queue: Arc<SendQueueData>,
433        latest_events: OnceCell<LatestEvents>,
434        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
435        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
436        cross_process_lock_config: CrossProcessLockConfig,
437        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
438        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
439    ) -> Arc<Self> {
440        let caches = ClientCaches {
441            supported_versions: Cache::with_value(supported_versions),
442            well_known: Cache::with_value(well_known),
443            server_metadata: Cache::new(),
444            homeserver_capabilities: Cache::new(),
445        };
446
447        let client = Self {
448            server,
449            homeserver: StdRwLock::new(homeserver),
450            auth_ctx,
451            sliding_sync_version: StdRwLock::new(sliding_sync_version),
452            http_client,
453            base_client,
454            caches,
455            locks: Default::default(),
456            cross_process_lock_config,
457            typing_notice_times: Default::default(),
458            event_handlers: Default::default(),
459            notification_handlers: Default::default(),
460            room_update_channels: Default::default(),
461            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
462            // ballast for all observers to catch up.
463            room_updates_sender: broadcast::Sender::new(32),
464            respect_login_well_known,
465            sync_beat: event_listener::Event::new(),
466            event_cache,
467            send_queue_data: send_queue,
468            latest_events,
469            #[cfg(feature = "e2e-encryption")]
470            e2ee: EncryptionData::new(encryption_settings),
471            #[cfg(feature = "e2e-encryption")]
472            verification_state: SharedObservable::new(VerificationState::Unknown),
473            #[cfg(feature = "e2e-encryption")]
474            enable_share_history_on_invite,
475            server_max_upload_size: Mutex::new(OnceCell::new()),
476            #[cfg(feature = "experimental-search")]
477            search_index: search_index_handler,
478            thread_subscription_catchup,
479            task_monitor: TaskMonitor::new(),
480            #[cfg(feature = "e2e-encryption")]
481            duplicate_key_upload_error_sender: broadcast::channel(1).0,
482        };
483
484        #[allow(clippy::let_and_return)]
485        let client = Arc::new(client);
486
487        #[cfg(feature = "e2e-encryption")]
488        client.e2ee.initialize_tasks(&client);
489
490        let init_event_cache = client.event_cache.get_or_init(|| async {
491            EventCache::new(&client, client.base_client.event_cache_store().clone())
492        });
493
494        let init_thread_subscription_catchup = client
495            .thread_subscription_catchup
496            .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
497
498        let _ = join!(init_event_cache, init_thread_subscription_catchup);
499
500        client
501    }
502}
503
504#[cfg(not(tarpaulin_include))]
505impl Debug for Client {
506    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
507        write!(fmt, "Client")
508    }
509}
510
511impl Client {
512    /// Create a new [`Client`] that will use the given homeserver.
513    ///
514    /// # Arguments
515    ///
516    /// * `homeserver_url` - The homeserver that the client should connect to.
517    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
518        Self::builder().homeserver_url(homeserver_url).build().await
519    }
520
521    /// Returns a subscriber that publishes an event every time the ignore user
522    /// list changes.
523    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
524        self.inner.base_client.subscribe_to_ignore_user_list_changes()
525    }
526
527    /// Create a new [`ClientBuilder`].
528    pub fn builder() -> ClientBuilder {
529        ClientBuilder::new()
530    }
531
532    pub(crate) fn base_client(&self) -> &BaseClient {
533        &self.inner.base_client
534    }
535
536    /// The underlying HTTP client.
537    pub fn http_client(&self) -> &reqwest::Client {
538        &self.inner.http_client.inner
539    }
540
541    pub(crate) fn locks(&self) -> &ClientLocks {
542        &self.inner.locks
543    }
544
545    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
546        &self.inner.auth_ctx
547    }
548
549    /// The cross-process store lock configuration used by this [`Client`].
550    ///
551    /// The SDK provides cross-process store locks (see
552    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
553    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
554    /// the value used for all cross-process store locks used by this
555    /// `Client`.
556    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
557        &self.inner.cross_process_lock_config
558    }
559
560    /// Change the homeserver URL used by this client.
561    ///
562    /// # Arguments
563    ///
564    /// * `homeserver_url` - The new URL to use.
565    fn set_homeserver(&self, homeserver_url: Url) {
566        *self.inner.homeserver.write().unwrap() = homeserver_url;
567    }
568
569    /// Change to a different homeserver and re-resolve well-known.
570    #[cfg(feature = "e2e-encryption")]
571    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
572        &self,
573        homeserver_url: Url,
574    ) -> Result<()> {
575        self.set_homeserver(homeserver_url);
576        self.reset_well_known().await?;
577        if let Some(well_known) = self.well_known().await {
578            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
579        }
580        Ok(())
581    }
582
583    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
584    /// supported or disabled by the homeserver.
585    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
586        HomeserverCapabilities::new(self.clone())
587    }
588
589    /// Get the server vendor information from the federation API.
590    ///
591    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
592    /// both the server's software name and version.
593    ///
594    /// # Examples
595    ///
596    /// ```no_run
597    /// # use matrix_sdk::Client;
598    /// # use url::Url;
599    /// # async {
600    /// # let homeserver = Url::parse("http://example.com")?;
601    /// let client = Client::new(homeserver).await?;
602    ///
603    /// let server_info = client.server_vendor_info(None).await?;
604    /// println!(
605    ///     "Server: {}, Version: {}",
606    ///     server_info.server_name, server_info.version
607    /// );
608    /// # anyhow::Ok(()) };
609    /// ```
610    #[cfg(feature = "federation-api")]
611    pub async fn server_vendor_info(
612        &self,
613        request_config: Option<RequestConfig>,
614    ) -> HttpResult<ServerVendorInfo> {
615        use ruma::api::federation::discovery::get_server_version;
616
617        let res = self
618            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
619            .await?;
620
621        // Extract server info, using defaults if fields are missing.
622        let server = res.server.unwrap_or_default();
623        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
624        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
625
626        Ok(ServerVendorInfo { server_name: server_name_str, version })
627    }
628
629    /// Get a copy of the default request config.
630    ///
631    /// The default request config is what's used when sending requests if no
632    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
633    /// function with such a parameter.
634    ///
635    /// If the default request config was not customized through
636    /// [`ClientBuilder`] when creating this `Client`, the returned value will
637    /// be equivalent to [`RequestConfig::default()`].
638    pub fn request_config(&self) -> RequestConfig {
639        self.inner.http_client.request_config
640    }
641
642    /// Check whether the client has been activated.
643    ///
644    /// A client is considered active when:
645    ///
646    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
647    ///    is logged in,
648    /// 2. Has loaded cached data from storage,
649    /// 3. If encryption is enabled, it also initialized or restored its
650    ///    `OlmMachine`.
651    pub fn is_active(&self) -> bool {
652        self.inner.base_client.is_active()
653    }
654
655    /// The server used by the client.
656    ///
657    /// See `Self::server` to learn more.
658    pub fn server(&self) -> Option<&Url> {
659        self.inner.server.as_ref()
660    }
661
662    /// The homeserver of the client.
663    pub fn homeserver(&self) -> Url {
664        self.inner.homeserver.read().unwrap().clone()
665    }
666
667    /// Get the sliding sync version.
668    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
669        self.inner.sliding_sync_version.read().unwrap().clone()
670    }
671
672    /// Override the sliding sync version.
673    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
674        let mut lock = self.inner.sliding_sync_version.write().unwrap();
675        *lock = version;
676    }
677
678    /// Get the Matrix user session meta information.
679    ///
680    /// If the client is currently logged in, this will return a
681    /// [`SessionMeta`] object which contains the user ID and device ID.
682    /// Otherwise it returns `None`.
683    pub fn session_meta(&self) -> Option<&SessionMeta> {
684        self.base_client().session_meta()
685    }
686
687    /// Returns a receiver that gets events for each room info update. To watch
688    /// for new events, use `receiver.resubscribe()`.
689    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
690        self.base_client().room_info_notable_update_receiver()
691    }
692
693    /// Performs a search for users.
694    /// The search is performed case-insensitively on user IDs and display names
695    ///
696    /// # Arguments
697    ///
698    /// * `search_term` - The search term for the search
699    /// * `limit` - The maximum number of results to return. Defaults to 10.
700    ///
701    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
702    pub async fn search_users(
703        &self,
704        search_term: &str,
705        limit: u64,
706    ) -> HttpResult<search_users::v3::Response> {
707        let mut request = search_users::v3::Request::new(search_term.to_owned());
708
709        if let Some(limit) = UInt::new(limit) {
710            request.limit = limit;
711        }
712
713        self.send(request).await
714    }
715
716    /// Get the user id of the current owner of the client.
717    pub fn user_id(&self) -> Option<&UserId> {
718        self.session_meta().map(|s| s.user_id.as_ref())
719    }
720
721    /// Get the device ID that identifies the current session.
722    pub fn device_id(&self) -> Option<&DeviceId> {
723        self.session_meta().map(|s| s.device_id.as_ref())
724    }
725
726    /// Get the current access token for this session.
727    ///
728    /// Will be `None` if the client has not been logged in.
729    pub fn access_token(&self) -> Option<String> {
730        self.auth_ctx().access_token()
731    }
732
733    /// Get the current tokens for this session.
734    ///
735    /// To be notified of changes in the session tokens, use
736    /// [`Client::subscribe_to_session_changes()`] or
737    /// [`Client::set_session_callbacks()`].
738    ///
739    /// Returns `None` if the client has not been logged in.
740    pub fn session_tokens(&self) -> Option<SessionTokens> {
741        self.auth_ctx().session_tokens()
742    }
743
744    /// Access the authentication API used to log in this client.
745    ///
746    /// Will be `None` if the client has not been logged in.
747    pub fn auth_api(&self) -> Option<AuthApi> {
748        match self.auth_ctx().auth_data.get()? {
749            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
750            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
751        }
752    }
753
754    /// Get the whole session info of this client.
755    ///
756    /// Will be `None` if the client has not been logged in.
757    ///
758    /// Can be used with [`Client::restore_session`] to restore a previously
759    /// logged-in session.
760    pub fn session(&self) -> Option<AuthSession> {
761        match self.auth_api()? {
762            AuthApi::Matrix(api) => api.session().map(Into::into),
763            AuthApi::OAuth(api) => api.full_session().map(Into::into),
764        }
765    }
766
767    /// Get a reference to the state store.
768    pub fn state_store(&self) -> &DynStateStore {
769        self.base_client().state_store()
770    }
771
772    /// Get a reference to the event cache store.
773    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
774        self.base_client().event_cache_store()
775    }
776
777    /// Get a reference to the media store.
778    pub fn media_store(&self) -> &MediaStoreLock {
779        self.base_client().media_store()
780    }
781
782    /// Access the native Matrix authentication API with this client.
783    pub fn matrix_auth(&self) -> MatrixAuth {
784        MatrixAuth::new(self.clone())
785    }
786
787    /// Get the account of the current owner of the client.
788    pub fn account(&self) -> Account {
789        Account::new(self.clone())
790    }
791
792    /// Get the encryption manager of the client.
793    #[cfg(feature = "e2e-encryption")]
794    pub fn encryption(&self) -> Encryption {
795        Encryption::new(self.clone())
796    }
797
798    /// Get the media manager of the client.
799    pub fn media(&self) -> Media {
800        Media::new(self.clone())
801    }
802
803    /// Get the pusher manager of the client.
804    pub fn pusher(&self) -> Pusher {
805        Pusher::new(self.clone())
806    }
807
808    /// Access the OAuth 2.0 API of the client.
809    pub fn oauth(&self) -> OAuth {
810        OAuth::new(self.clone())
811    }
812
813    /// Register a handler for a specific event type.
814    ///
815    /// The handler is a function or closure with one or more arguments. The
816    /// first argument is the event itself. All additional arguments are
817    /// "context" arguments: They have to implement [`EventHandlerContext`].
818    /// This trait is named that way because most of the types implementing it
819    /// give additional context about an event: The room it was in, its raw form
820    /// and other similar things. As two exceptions to this,
821    /// [`Client`] and [`EventHandlerHandle`] also implement the
822    /// `EventHandlerContext` trait so you don't have to clone your client
823    /// into the event handler manually and a handler can decide to remove
824    /// itself.
825    ///
826    /// Some context arguments are not universally applicable. A context
827    /// argument that isn't available for the given event type will result in
828    /// the event handler being skipped and an error being logged. The following
829    /// context argument types are only available for a subset of event types:
830    ///
831    /// * [`Room`] is only available for room-specific events, i.e. not for
832    ///   events like global account data events or presence events.
833    ///
834    /// You can provide custom context via
835    /// [`add_event_handler_context`](Client::add_event_handler_context) and
836    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
837    /// into the event handler.
838    ///
839    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
840    ///
841    /// # Examples
842    ///
843    /// ```no_run
844    /// use matrix_sdk::{
845    ///     deserialized_responses::EncryptionInfo,
846    ///     event_handler::Ctx,
847    ///     ruma::{
848    ///         events::{
849    ///             macros::EventContent,
850    ///             push_rules::PushRulesEvent,
851    ///             room::{
852    ///                 message::SyncRoomMessageEvent,
853    ///                 topic::SyncRoomTopicEvent,
854    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
855    ///             },
856    ///         },
857    ///         push::Action,
858    ///         Int, MilliSecondsSinceUnixEpoch,
859    ///     },
860    ///     Client, Room,
861    /// };
862    /// use serde::{Deserialize, Serialize};
863    ///
864    /// # async fn example(client: Client) {
865    /// client.add_event_handler(
866    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
867    ///         // Common usage: Room event plus room and client.
868    ///     },
869    /// );
870    /// client.add_event_handler(
871    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
872    ///         async move {
873    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
874    ///             // unencrypted events and events that were decrypted by the SDK.
875    ///         }
876    ///     },
877    /// );
878    /// client.add_event_handler(
879    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
880    ///         async move {
881    ///             // A `Vec<Action>` parameter allows you to know which push actions
882    ///             // are applicable for an event. For example, an event with
883    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
884    ///             // should be highlighted in the timeline.
885    ///         }
886    ///     },
887    /// );
888    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
889    ///     // You can omit any or all arguments after the first.
890    /// });
891    ///
892    /// // Registering a temporary event handler:
893    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
894    ///     /* Event handler */
895    /// });
896    /// client.remove_event_handler(handle);
897    ///
898    /// // Registering custom event handler context:
899    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
900    /// struct MyContext {
901    ///     number: usize,
902    /// }
903    /// client.add_event_handler_context(MyContext { number: 5 });
904    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
905    ///     // Use the context
906    /// });
907    ///
908    /// // This will handle membership events in joined rooms. Invites are special, see below.
909    /// client.add_event_handler(
910    ///     |ev: SyncRoomMemberEvent| async move {},
911    /// );
912    ///
913    /// // To handle state events in invited rooms (including invite membership events),
914    /// // `StrippedRoomMemberEvent` should be used.
915    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
916    /// client.add_event_handler(
917    ///     |ev: StrippedRoomMemberEvent| async move {},
918    /// );
919    ///
920    /// // Custom events work exactly the same way, you just need to declare
921    /// // the content struct and use the EventContent derive macro on it.
922    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
923    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
924    /// struct TokenEventContent {
925    ///     token: String,
926    ///     #[serde(rename = "exp")]
927    ///     expires_at: MilliSecondsSinceUnixEpoch,
928    /// }
929    ///
930    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
931    ///     todo!("Display the token");
932    /// });
933    ///
934    /// // Event handler closures can also capture local variables.
935    /// // Make sure they are cheap to clone though, because they will be cloned
936    /// // every time the closure is called.
937    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
938    ///
939    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
940    ///     println!("Calling the handler with identifier {data}");
941    /// });
942    /// # }
943    /// ```
944    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
945    where
946        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
947        H: EventHandler<Ev, Ctx>,
948    {
949        self.add_event_handler_impl(handler, None)
950    }
951
952    /// Register a handler for a specific room, and event type.
953    ///
954    /// This method works the same way as
955    /// [`add_event_handler`][Self::add_event_handler], except that the handler
956    /// will only be called for events in the room with the specified ID. See
957    /// that method for more details on event handler functions.
958    ///
959    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
960    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
961    /// your use case.
962    pub fn add_room_event_handler<Ev, Ctx, H>(
963        &self,
964        room_id: &RoomId,
965        handler: H,
966    ) -> EventHandlerHandle
967    where
968        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
969        H: EventHandler<Ev, Ctx>,
970    {
971        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
972    }
973
974    /// Observe a specific event type.
975    ///
976    /// `Ev` represents the kind of event that will be observed. `Ctx`
977    /// represents the context that will come with the event. It relies on the
978    /// same mechanism as [`Client::add_event_handler`]. The main difference is
979    /// that it returns an [`ObservableEventHandler`] and doesn't require a
980    /// user-defined closure. It is possible to subscribe to the
981    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
982    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
983    /// Ctx)`.
984    ///
985    /// Be careful that only the most recent value can be observed. Subscribers
986    /// are notified when a new value is sent, but there is no guarantee
987    /// that they will see all values.
988    ///
989    /// # Example
990    ///
991    /// Let's see a classical usage:
992    ///
993    /// ```
994    /// use futures_util::StreamExt as _;
995    /// use matrix_sdk::{
996    ///     Client, Room,
997    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
998    /// };
999    ///
1000    /// # async fn example(client: Client) -> Option<()> {
1001    /// let observer =
1002    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1003    ///
1004    /// let mut subscriber = observer.subscribe();
1005    ///
1006    /// let (event, (room, push_actions)) = subscriber.next().await?;
1007    /// # Some(())
1008    /// # }
1009    /// ```
1010    ///
1011    /// Now let's see how to get several contexts that can be useful for you:
1012    ///
1013    /// ```
1014    /// use matrix_sdk::{
1015    ///     Client, Room,
1016    ///     deserialized_responses::EncryptionInfo,
1017    ///     ruma::{
1018    ///         events::room::{
1019    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1020    ///         },
1021    ///         push::Action,
1022    ///     },
1023    /// };
1024    ///
1025    /// # async fn example(client: Client) {
1026    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1027    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1028    ///
1029    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1030    /// // to distinguish between unencrypted events and events that were decrypted
1031    /// // by the SDK.
1032    /// let _ = client
1033    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1034    ///     );
1035    ///
1036    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1037    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1038    /// // should be highlighted in the timeline.
1039    /// let _ =
1040    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1041    ///
1042    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1043    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1044    /// # }
1045    /// ```
1046    ///
1047    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1048    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1049    where
1050        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1051        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1052    {
1053        self.observe_room_events_impl(None)
1054    }
1055
1056    /// Observe a specific room, and event type.
1057    ///
1058    /// This method works the same way as [`Client::observe_events`], except
1059    /// that the observability will only be applied for events in the room with
1060    /// the specified ID. See that method for more details.
1061    ///
1062    /// Be careful that only the most recent value can be observed. Subscribers
1063    /// are notified when a new value is sent, but there is no guarantee
1064    /// that they will see all values.
1065    pub fn observe_room_events<Ev, Ctx>(
1066        &self,
1067        room_id: &RoomId,
1068    ) -> ObservableEventHandler<(Ev, Ctx)>
1069    where
1070        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1071        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1072    {
1073        self.observe_room_events_impl(Some(room_id.to_owned()))
1074    }
1075
1076    /// Shared implementation for `Client::observe_events` and
1077    /// `Client::observe_room_events`.
1078    fn observe_room_events_impl<Ev, Ctx>(
1079        &self,
1080        room_id: Option<OwnedRoomId>,
1081    ) -> ObservableEventHandler<(Ev, Ctx)>
1082    where
1083        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1084        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1085    {
1086        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1087        // new value.
1088        let shared_observable = SharedObservable::new(None);
1089
1090        ObservableEventHandler::new(
1091            shared_observable.clone(),
1092            self.event_handler_drop_guard(self.add_event_handler_impl(
1093                move |event: Ev, context: Ctx| {
1094                    shared_observable.set(Some((event, context)));
1095
1096                    ready(())
1097                },
1098                room_id,
1099            )),
1100        )
1101    }
1102
1103    /// Subscribe to future `beacon_info` updates for the current user across
1104    /// all rooms.
1105    ///
1106    /// This stream is push-only: it emits only future updates observed during
1107    /// sync processing and does not replay existing state.
1108    pub fn observe_own_beacon_info_updates(
1109        &self,
1110    ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1111        let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1112        let mut stream = observer.subscribe();
1113        let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1114        Ok(async_stream::stream! {
1115            let _observer = observer;
1116
1117            while let Some((event, room)) = stream.next().await {
1118                if event.state_key != own_user_id {
1119                    continue;
1120                }
1121                yield BeaconInfoUpdate {
1122                    room_id: room.room_id().to_owned(),
1123                    event_id: event.event_id,
1124                    content: event.content,
1125                };
1126            }
1127        })
1128    }
1129
1130    /// Remove the event handler associated with the handle.
1131    ///
1132    /// Note that you **must not** call `remove_event_handler` from the
1133    /// non-async part of an event handler, that is:
1134    ///
1135    /// ```ignore
1136    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1137    ///     // âš  this will cause a deadlock âš 
1138    ///     client.remove_event_handler(handle);
1139    ///
1140    ///     async move {
1141    ///         // removing the event handler here is fine
1142    ///         client.remove_event_handler(handle);
1143    ///     }
1144    /// })
1145    /// ```
1146    ///
1147    /// Note also that handlers that remove themselves will still execute with
1148    /// events received in the same sync cycle.
1149    ///
1150    /// # Arguments
1151    ///
1152    /// `handle` - The [`EventHandlerHandle`] that is returned when
1153    /// registering the event handler with [`Client::add_event_handler`].
1154    ///
1155    /// # Examples
1156    ///
1157    /// ```no_run
1158    /// # use url::Url;
1159    /// # use tokio::sync::mpsc;
1160    /// #
1161    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1162    /// #
1163    /// use matrix_sdk::{
1164    ///     Client, event_handler::EventHandlerHandle,
1165    ///     ruma::events::room::member::SyncRoomMemberEvent,
1166    /// };
1167    /// #
1168    /// # futures_executor::block_on(async {
1169    /// # let client = matrix_sdk::Client::builder()
1170    /// #     .homeserver_url(homeserver)
1171    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1172    /// #     .build()
1173    /// #     .await
1174    /// #     .unwrap();
1175    ///
1176    /// client.add_event_handler(
1177    ///     |ev: SyncRoomMemberEvent,
1178    ///      client: Client,
1179    ///      handle: EventHandlerHandle| async move {
1180    ///         // Common usage: Check arriving Event is the expected one
1181    ///         println!("Expected RoomMemberEvent received!");
1182    ///         client.remove_event_handler(handle);
1183    ///     },
1184    /// );
1185    /// # });
1186    /// ```
1187    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1188        self.inner.event_handlers.remove(handle);
1189    }
1190
1191    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1192    /// the given handle.
1193    ///
1194    /// When the returned value is dropped, the event handler will be removed.
1195    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1196        EventHandlerDropGuard::new(handle, self.clone())
1197    }
1198
1199    /// Add an arbitrary value for use as event handler context.
1200    ///
1201    /// The value can be obtained in an event handler by adding an argument of
1202    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1203    ///
1204    /// If a value of the same type has been added before, it will be
1205    /// overwritten.
1206    ///
1207    /// # Examples
1208    ///
1209    /// ```no_run
1210    /// use matrix_sdk::{
1211    ///     Room, event_handler::Ctx,
1212    ///     ruma::events::room::message::SyncRoomMessageEvent,
1213    /// };
1214    /// # #[derive(Clone)]
1215    /// # struct SomeType;
1216    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1217    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1218    /// # futures_executor::block_on(async {
1219    /// # let client = matrix_sdk::Client::builder()
1220    /// #     .homeserver_url(homeserver)
1221    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1222    /// #     .build()
1223    /// #     .await
1224    /// #     .unwrap();
1225    ///
1226    /// // Handle used to send messages to the UI part of the app
1227    /// let my_gui_handle: SomeType = obtain_gui_handle();
1228    ///
1229    /// client.add_event_handler_context(my_gui_handle.clone());
1230    /// client.add_event_handler(
1231    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1232    ///         async move {
1233    ///             // gui_handle.send(DisplayMessage { message: ev });
1234    ///         }
1235    ///     },
1236    /// );
1237    /// # });
1238    /// ```
1239    pub fn add_event_handler_context<T>(&self, ctx: T)
1240    where
1241        T: Clone + Send + Sync + 'static,
1242    {
1243        self.inner.event_handlers.add_context(ctx);
1244    }
1245
1246    /// Register a handler for a notification.
1247    ///
1248    /// Similar to [`Client::add_event_handler`], but only allows functions
1249    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1250    /// [`Client`] for now.
1251    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1252    where
1253        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1254        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1255    {
1256        self.inner.notification_handlers.write().await.push(Box::new(
1257            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1258        ));
1259
1260        self
1261    }
1262
1263    /// Subscribe to all updates for the room with the given ID.
1264    ///
1265    /// The returned receiver will receive a new message for each sync response
1266    /// that contains updates for that room.
1267    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1268        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1269            btree_map::Entry::Vacant(entry) => {
1270                let (tx, rx) = broadcast::channel(8);
1271                entry.insert(tx);
1272                rx
1273            }
1274            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1275        }
1276    }
1277
1278    /// Subscribe to all updates to all rooms, whenever any has been received in
1279    /// a sync response.
1280    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1281        self.inner.room_updates_sender.subscribe()
1282    }
1283
1284    pub(crate) async fn notification_handlers(
1285        &self,
1286    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1287        self.inner.notification_handlers.read().await
1288    }
1289
1290    /// Get all the rooms the client knows about.
1291    ///
1292    /// This will return the list of joined, invited, and left rooms.
1293    pub fn rooms(&self) -> Vec<Room> {
1294        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1295    }
1296
1297    /// Get all the rooms the client knows about, filtered by room state.
1298    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1299        self.base_client()
1300            .rooms_filtered(filter)
1301            .into_iter()
1302            .map(|room| Room::new(self.clone(), room))
1303            .collect()
1304    }
1305
1306    /// Get a stream of all the rooms, in addition to the existing rooms.
1307    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1308        let (rooms, stream) = self.base_client().rooms_stream();
1309
1310        let map_room = |room| Room::new(self.clone(), room);
1311
1312        (
1313            rooms.into_iter().map(map_room).collect(),
1314            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1315        )
1316    }
1317
1318    /// Returns the joined rooms this client knows about.
1319    pub fn joined_rooms(&self) -> Vec<Room> {
1320        self.rooms_filtered(RoomStateFilter::JOINED)
1321    }
1322
1323    /// Returns the invited rooms this client knows about.
1324    pub fn invited_rooms(&self) -> Vec<Room> {
1325        self.rooms_filtered(RoomStateFilter::INVITED)
1326    }
1327
1328    /// Returns the left rooms this client knows about.
1329    pub fn left_rooms(&self) -> Vec<Room> {
1330        self.rooms_filtered(RoomStateFilter::LEFT)
1331    }
1332
1333    /// Returns the joined space rooms this client knows about.
1334    pub fn joined_space_rooms(&self) -> Vec<Room> {
1335        self.base_client()
1336            .rooms_filtered(RoomStateFilter::JOINED)
1337            .into_iter()
1338            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1339            .collect()
1340    }
1341
1342    /// Get a room with the given room id.
1343    ///
1344    /// # Arguments
1345    ///
1346    /// `room_id` - The unique id of the room that should be fetched.
1347    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1348        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1349    }
1350
1351    /// Gets the preview of a room, whether the current user has joined it or
1352    /// not.
1353    pub async fn get_room_preview(
1354        &self,
1355        room_or_alias_id: &RoomOrAliasId,
1356        via: Vec<OwnedServerName>,
1357    ) -> Result<RoomPreview> {
1358        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1359            Ok(room_id) => room_id.to_owned(),
1360            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1361        };
1362
1363        if let Some(room) = self.get_room(&room_id) {
1364            // The cached data can only be trusted if the room state is joined or
1365            // banned: for invite and knock rooms, no updates will be received
1366            // for the rooms after the invite/knock action took place so we may
1367            // have very out to date data for important fields such as
1368            // `join_rule`. For left rooms, the homeserver should return the latest info.
1369            match room.state() {
1370                RoomState::Joined | RoomState::Banned => {
1371                    return Ok(RoomPreview::from_known_room(&room).await);
1372                }
1373                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1374            }
1375        }
1376
1377        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1378    }
1379
1380    /// Resolve a room alias to a room id and a list of servers which know
1381    /// about it.
1382    ///
1383    /// # Arguments
1384    ///
1385    /// `room_alias` - The room alias to be resolved.
1386    pub async fn resolve_room_alias(
1387        &self,
1388        room_alias: &RoomAliasId,
1389    ) -> HttpResult<get_alias::v3::Response> {
1390        let request = get_alias::v3::Request::new(room_alias.to_owned());
1391        self.send(request).await
1392    }
1393
1394    /// Checks if a room alias is not in use yet.
1395    ///
1396    /// Returns:
1397    /// - `Ok(true)` if the room alias is available.
1398    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1399    ///   status code).
1400    /// - An `Err` otherwise.
1401    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1402        match self.resolve_room_alias(alias).await {
1403            // The room alias was resolved, so it's already in use.
1404            Ok(_) => Ok(false),
1405            Err(error) => {
1406                match error.client_api_error_kind() {
1407                    // The room alias wasn't found, so it's available.
1408                    Some(ErrorKind::NotFound) => Ok(true),
1409                    _ => Err(error),
1410                }
1411            }
1412        }
1413    }
1414
1415    /// Adds a new room alias associated with a room to the room directory.
1416    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1417        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1418        self.send(request).await?;
1419        Ok(())
1420    }
1421
1422    /// Removes a room alias from the room directory.
1423    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1424        let request = delete_alias::v3::Request::new(alias.to_owned());
1425        self.send(request).await?;
1426        Ok(())
1427    }
1428
1429    /// Update the homeserver from the login response well-known if needed.
1430    ///
1431    /// # Arguments
1432    ///
1433    /// * `login_well_known` - The `well_known` field from a successful login
1434    ///   response.
1435    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1436        if self.inner.respect_login_well_known
1437            && let Some(well_known) = login_well_known
1438            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1439        {
1440            self.set_homeserver(homeserver);
1441        }
1442    }
1443
1444    /// Similar to [`Client::restore_session_with`], with
1445    /// [`RoomLoadSettings::default()`].
1446    ///
1447    /// # Panics
1448    ///
1449    /// Panics if a session was already restored or logged in.
1450    #[instrument(skip_all)]
1451    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1452        self.restore_session_with(session, RoomLoadSettings::default()).await
1453    }
1454
1455    /// Restore a session previously logged-in using one of the available
1456    /// authentication APIs. The number of rooms to restore is controlled by
1457    /// [`RoomLoadSettings`].
1458    ///
1459    /// See the documentation of the corresponding authentication API's
1460    /// `restore_session` method for more information.
1461    ///
1462    /// # Panics
1463    ///
1464    /// Panics if a session was already restored or logged in.
1465    #[instrument(skip_all)]
1466    pub async fn restore_session_with(
1467        &self,
1468        session: impl Into<AuthSession>,
1469        room_load_settings: RoomLoadSettings,
1470    ) -> Result<()> {
1471        let session = session.into();
1472        match session {
1473            AuthSession::Matrix(session) => {
1474                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1475            }
1476            AuthSession::OAuth(session) => {
1477                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1478            }
1479        }
1480    }
1481
1482    /// Refresh the access token using the authentication API used to log into
1483    /// this session.
1484    ///
1485    /// See the documentation of the authentication API's `refresh_access_token`
1486    /// method for more information.
1487    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1488        let Some(auth_api) = self.auth_api() else {
1489            return Err(RefreshTokenError::RefreshTokenRequired);
1490        };
1491
1492        match auth_api {
1493            AuthApi::Matrix(api) => {
1494                trace!("Token refresh: Using the homeserver.");
1495                Box::pin(api.refresh_access_token()).await?;
1496            }
1497            AuthApi::OAuth(api) => {
1498                trace!("Token refresh: Using OAuth 2.0.");
1499                Box::pin(api.refresh_access_token()).await?;
1500            }
1501        }
1502
1503        Ok(())
1504    }
1505
1506    /// Log out the current session using the proper authentication API.
1507    ///
1508    /// # Errors
1509    ///
1510    /// Returns an error if the session is not authenticated or if an error
1511    /// occurred while making the request to the server.
1512    pub async fn logout(&self) -> Result<(), Error> {
1513        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1514        match auth_api {
1515            AuthApi::Matrix(matrix_auth) => {
1516                matrix_auth.logout().await?;
1517                Ok(())
1518            }
1519            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1520        }
1521    }
1522
1523    /// Get or upload a sync filter.
1524    ///
1525    /// This method will either get a filter ID from the store or upload the
1526    /// filter definition to the homeserver and return the new filter ID.
1527    ///
1528    /// # Arguments
1529    ///
1530    /// * `filter_name` - The unique name of the filter, this name will be used
1531    /// locally to store and identify the filter ID returned by the server.
1532    ///
1533    /// * `definition` - The filter definition that should be uploaded to the
1534    /// server if no filter ID can be found in the store.
1535    ///
1536    /// # Examples
1537    ///
1538    /// ```no_run
1539    /// # use matrix_sdk::{
1540    /// #    Client, config::SyncSettings,
1541    /// #    ruma::api::client::{
1542    /// #        filter::{
1543    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1544    /// #        },
1545    /// #        sync::sync_events::v3::Filter,
1546    /// #    }
1547    /// # };
1548    /// # use url::Url;
1549    /// # async {
1550    /// # let homeserver = Url::parse("http://example.com").unwrap();
1551    /// # let client = Client::new(homeserver).await.unwrap();
1552    /// let mut filter = FilterDefinition::default();
1553    ///
1554    /// // Let's enable member lazy loading.
1555    /// filter.room.state.lazy_load_options =
1556    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1557    ///
1558    /// let filter_id = client
1559    ///     .get_or_upload_filter("sync", filter)
1560    ///     .await
1561    ///     .unwrap();
1562    ///
1563    /// let sync_settings = SyncSettings::new()
1564    ///     .filter(Filter::FilterId(filter_id));
1565    ///
1566    /// let response = client.sync_once(sync_settings).await.unwrap();
1567    /// # };
1568    #[instrument(skip(self, definition))]
1569    pub async fn get_or_upload_filter(
1570        &self,
1571        filter_name: &str,
1572        definition: FilterDefinition,
1573    ) -> Result<String> {
1574        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1575            debug!("Found filter locally");
1576            Ok(filter)
1577        } else {
1578            debug!("Didn't find filter locally");
1579            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1580            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1581            let response = self.send(request).await?;
1582
1583            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1584
1585            Ok(response.filter_id)
1586        }
1587    }
1588
1589    /// Prepare to join a room by ID, by getting the current details about it
1590    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1591        let room = self.get_room(room_id)?;
1592
1593        let inviter = match room.invite_details().await {
1594            Ok(details) => details.inviter,
1595            Err(Error::WrongRoomState(_)) => None,
1596            Err(e) => {
1597                warn!("Error fetching invite details for room: {e:?}");
1598                None
1599            }
1600        };
1601
1602        Some(PreJoinRoomInfo { inviter })
1603    }
1604
1605    /// Finish joining a room.
1606    ///
1607    /// If the room was an invite that should be marked as a DM, will include it
1608    /// in the DM event after creating the joined room.
1609    ///
1610    /// If encrypted history sharing is enabled, will check to see if we have a
1611    /// key bundle, and import it if so.
1612    ///
1613    /// # Arguments
1614    ///
1615    /// * `room_id` - The `RoomId` of the room that was joined.
1616    /// * `pre_join_room_info` - Information about the room before we joined.
1617    async fn finish_join_room(
1618        &self,
1619        room_id: &RoomId,
1620        pre_join_room_info: Option<PreJoinRoomInfo>,
1621    ) -> Result<Room> {
1622        info!(?room_id, ?pre_join_room_info, "Completing room join");
1623        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1624            room.state() == RoomState::Invited
1625                && room.is_direct().await.unwrap_or_else(|e| {
1626                    warn!(%room_id, "is_direct() failed: {e}");
1627                    false
1628                })
1629        } else {
1630            false
1631        };
1632
1633        let base_room = self
1634            .base_client()
1635            .room_joined(
1636                room_id,
1637                pre_join_room_info
1638                    .as_ref()
1639                    .and_then(|info| info.inviter.as_ref())
1640                    .map(|i| i.user_id().to_owned()),
1641            )
1642            .await?;
1643        let room = Room::new(self.clone(), base_room);
1644
1645        if mark_as_dm {
1646            room.set_is_direct(true).await?;
1647        }
1648
1649        // If we joined following an invite, check if we had previously received a key
1650        // bundle from the inviter, and import it if so.
1651        //
1652        // It's important that we only do this once `BaseClient::room_joined` has
1653        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1654        // race.
1655        #[cfg(feature = "e2e-encryption")]
1656        if self.inner.enable_share_history_on_invite
1657            && let Some(inviter) =
1658                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1659        {
1660            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1661                .await?;
1662        }
1663
1664        // Suppress "unused variable" and "unused field" lints
1665        #[cfg(not(feature = "e2e-encryption"))]
1666        let _ = pre_join_room_info.map(|i| i.inviter);
1667
1668        Ok(room)
1669    }
1670
1671    /// Join a room by `RoomId`.
1672    ///
1673    /// Returns the `Room` in the joined state.
1674    ///
1675    /// # Arguments
1676    ///
1677    /// * `room_id` - The `RoomId` of the room to be joined.
1678    #[instrument(skip(self))]
1679    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1680        // See who invited us to this room, if anyone. Note we have to do this before
1681        // making the `/join` request, otherwise we could race against the sync.
1682        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1683
1684        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1685        let response = self.send(request).await?;
1686        self.finish_join_room(&response.room_id, pre_join_info).await
1687    }
1688
1689    /// Join a room by `RoomOrAliasId`.
1690    ///
1691    /// Returns the `Room` in the joined state.
1692    ///
1693    /// # Arguments
1694    ///
1695    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1696    ///   alias looks like `#name:example.com`.
1697    /// * `server_names` - The server names to be used for resolving the alias,
1698    ///   if needs be.
1699    #[instrument(skip(self))]
1700    pub async fn join_room_by_id_or_alias(
1701        &self,
1702        alias: &RoomOrAliasId,
1703        server_names: &[OwnedServerName],
1704    ) -> Result<Room> {
1705        let pre_join_info = {
1706            match alias.try_into() {
1707                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1708                Err(_) => {
1709                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1710                    // responding to an invitation to the room, and therefore don't need to handle
1711                    // things that happen as a result of invites.
1712                    None
1713                }
1714            }
1715        };
1716        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1717            via: server_names.to_owned(),
1718        });
1719        let response = self.send(request).await?;
1720        self.finish_join_room(&response.room_id, pre_join_info).await
1721    }
1722
1723    /// Search the homeserver's directory of public rooms.
1724    ///
1725    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1726    /// a `get_public_rooms::Response`.
1727    ///
1728    /// # Arguments
1729    ///
1730    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1731    ///
1732    /// * `since` - Pagination token from a previous request.
1733    ///
1734    /// * `server` - The name of the server, if `None` the requested server is
1735    ///   used.
1736    ///
1737    /// # Examples
1738    /// ```no_run
1739    /// use matrix_sdk::Client;
1740    /// # use url::Url;
1741    /// # let homeserver = Url::parse("http://example.com").unwrap();
1742    /// # let limit = Some(10);
1743    /// # let since = Some("since token");
1744    /// # let server = Some("servername.com".try_into().unwrap());
1745    /// # async {
1746    /// let mut client = Client::new(homeserver).await.unwrap();
1747    ///
1748    /// client.public_rooms(limit, since, server).await;
1749    /// # };
1750    /// ```
1751    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1752    pub async fn public_rooms(
1753        &self,
1754        limit: Option<u32>,
1755        since: Option<&str>,
1756        server: Option<&ServerName>,
1757    ) -> HttpResult<get_public_rooms::v3::Response> {
1758        let limit = limit.map(UInt::from);
1759
1760        let request = assign!(get_public_rooms::v3::Request::new(), {
1761            limit,
1762            since: since.map(ToOwned::to_owned),
1763            server: server.map(ToOwned::to_owned),
1764        });
1765        self.send(request).await
1766    }
1767
1768    /// Create a room with the given parameters.
1769    ///
1770    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1771    /// created room.
1772    ///
1773    /// If you want to create a direct message with one specific user, you can
1774    /// use [`create_dm`][Self::create_dm], which is more convenient than
1775    /// assembling the [`create_room::v3::Request`] yourself.
1776    ///
1777    /// If the `is_direct` field of the request is set to `true` and at least
1778    /// one user is invited, the room will be automatically added to the direct
1779    /// rooms in the account data.
1780    ///
1781    /// # Examples
1782    ///
1783    /// ```no_run
1784    /// use matrix_sdk::{
1785    ///     Client,
1786    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1787    /// };
1788    /// # use url::Url;
1789    /// #
1790    /// # async {
1791    /// # let homeserver = Url::parse("http://example.com").unwrap();
1792    /// let request = CreateRoomRequest::new();
1793    /// let client = Client::new(homeserver).await.unwrap();
1794    /// assert!(client.create_room(request).await.is_ok());
1795    /// # };
1796    /// ```
1797    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1798        let invite = request.invite.clone();
1799        let is_direct_room = request.is_direct;
1800        let response = self.send(request).await?;
1801
1802        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1803
1804        let joined_room = Room::new(self.clone(), base_room);
1805
1806        if is_direct_room
1807            && !invite.is_empty()
1808            && let Err(error) =
1809                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1810        {
1811            // FIXME: Retry in the background
1812            error!("Failed to mark room as DM: {error}");
1813        }
1814
1815        Ok(joined_room)
1816    }
1817
1818    /// Create a DM room.
1819    ///
1820    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1821    /// given user being invited, the room marked `is_direct` and both the
1822    /// creator and invitee getting the default maximum power level.
1823    ///
1824    /// If the `e2e-encryption` feature is enabled, the room will also be
1825    /// encrypted.
1826    ///
1827    /// # Arguments
1828    ///
1829    /// * `user_id` - The ID of the user to create a DM for.
1830    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1831        #[cfg(feature = "e2e-encryption")]
1832        let initial_state = vec![
1833            InitialStateEvent::with_empty_state_key(
1834                RoomEncryptionEventContent::with_recommended_defaults(),
1835            )
1836            .to_raw_any(),
1837        ];
1838
1839        #[cfg(not(feature = "e2e-encryption"))]
1840        let initial_state = vec![];
1841
1842        let request = assign!(create_room::v3::Request::new(), {
1843            invite: vec![user_id.to_owned()],
1844            is_direct: true,
1845            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1846            initial_state,
1847        });
1848
1849        self.create_room(request).await
1850    }
1851
1852    /// Get the first existing DM room with the given user, if any.
1853    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1854        self.get_dm_rooms(user_id).next()
1855    }
1856
1857    /// Get an iterator with the existing DM rooms for the given user.
1858    pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1859        let rooms = self.joined_rooms();
1860
1861        let dm_definition = &self.base_client().dm_room_definition;
1862
1863        // Find the room we share with the `user_id` and only with `user_id`
1864        let rooms = rooms.into_iter().filter(move |r| {
1865            let targets = r.direct_targets();
1866            let targets_match =
1867                targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1868            match dm_definition {
1869                DmRoomDefinition::MatrixSpec => targets_match,
1870                DmRoomDefinition::TwoMembers => {
1871                    let service_members_count =
1872                        r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1873                    let active_non_service_members =
1874                        r.active_members_count().saturating_sub(service_members_count);
1875                    targets_match && active_non_service_members <= 2
1876                }
1877            }
1878        });
1879
1880        trace!(?user_id, ?rooms, "Found DM rooms with user");
1881        rooms
1882    }
1883
1884    /// Search the homeserver's directory for public rooms with a filter.
1885    ///
1886    /// # Arguments
1887    ///
1888    /// * `room_search` - The easiest way to create this request is using the
1889    ///   `get_public_rooms_filtered::Request` itself.
1890    ///
1891    /// # Examples
1892    ///
1893    /// ```no_run
1894    /// # use url::Url;
1895    /// # use matrix_sdk::Client;
1896    /// # async {
1897    /// # let homeserver = Url::parse("http://example.com")?;
1898    /// use matrix_sdk::ruma::{
1899    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1900    /// };
1901    /// # let mut client = Client::new(homeserver).await?;
1902    ///
1903    /// let mut filter = Filter::new();
1904    /// filter.generic_search_term = Some("rust".to_owned());
1905    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1906    /// request.filter = filter;
1907    ///
1908    /// let response = client.public_rooms_filtered(request).await?;
1909    ///
1910    /// for room in response.chunk {
1911    ///     println!("Found room {room:?}");
1912    /// }
1913    /// # anyhow::Ok(()) };
1914    /// ```
1915    pub async fn public_rooms_filtered(
1916        &self,
1917        request: get_public_rooms_filtered::v3::Request,
1918    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1919        self.send(request).await
1920    }
1921
1922    /// Send an arbitrary request to the server, without updating client state.
1923    ///
1924    /// **Warning:** Because this method *does not* update the client state, it
1925    /// is important to make sure that you account for this yourself, and
1926    /// use wrapper methods where available.  This method should *only* be
1927    /// used if a wrapper method for the endpoint you'd like to use is not
1928    /// available.
1929    ///
1930    /// # Arguments
1931    ///
1932    /// * `request` - A filled out and valid request for the endpoint to be hit
1933    ///
1934    /// * `timeout` - An optional request timeout setting, this overrides the
1935    ///   default request setting if one was set.
1936    ///
1937    /// # Examples
1938    ///
1939    /// ```no_run
1940    /// # use matrix_sdk::{Client, config::SyncSettings};
1941    /// # use url::Url;
1942    /// # async {
1943    /// # let homeserver = Url::parse("http://localhost:8080")?;
1944    /// # let mut client = Client::new(homeserver).await?;
1945    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1946    ///
1947    /// // First construct the request you want to make
1948    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1949    /// // for all available Endpoints
1950    /// let user_id = owned_user_id!("@example:localhost");
1951    /// let request = profile::get_profile::v3::Request::new(user_id);
1952    ///
1953    /// // Start the request using Client::send()
1954    /// let response = client.send(request).await?;
1955    ///
1956    /// // Check the corresponding Response struct to find out what types are
1957    /// // returned
1958    /// # anyhow::Ok(()) };
1959    /// ```
1960    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1961    where
1962        Request: OutgoingRequest + Clone + Debug,
1963        Request::Authentication: SupportedAuthScheme,
1964        Request::PathBuilder: SupportedPathBuilder,
1965        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1966        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1967    {
1968        SendRequest {
1969            client: self.clone(),
1970            request,
1971            config: None,
1972            send_progress: Default::default(),
1973        }
1974    }
1975
1976    pub(crate) async fn send_inner<Request>(
1977        &self,
1978        request: Request,
1979        config: Option<RequestConfig>,
1980        send_progress: SharedObservable<TransmissionProgress>,
1981    ) -> HttpResult<Request::IncomingResponse>
1982    where
1983        Request: OutgoingRequest + Debug,
1984        Request::Authentication: SupportedAuthScheme,
1985        Request::PathBuilder: SupportedPathBuilder,
1986        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1987        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1988    {
1989        let homeserver = self.homeserver().to_string();
1990        let access_token = self.access_token();
1991        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1992
1993        let path_builder_input =
1994            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1995
1996        let result = self
1997            .inner
1998            .http_client
1999            .send(
2000                request,
2001                config,
2002                homeserver,
2003                access_token.as_deref(),
2004                path_builder_input,
2005                send_progress,
2006            )
2007            .await;
2008
2009        if let Err(Some(ErrorKind::UnknownToken { .. })) =
2010            result.as_ref().map_err(HttpError::client_api_error_kind)
2011            && let Some(access_token) = &access_token
2012        {
2013            // Mark the access token as expired.
2014            self.auth_ctx().set_access_token_expired(access_token);
2015        }
2016
2017        result
2018    }
2019
2020    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2021        _ = self
2022            .inner
2023            .auth_ctx
2024            .session_change_sender
2025            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2026    }
2027
2028    /// Fetches server versions from network; no caching.
2029    pub async fn fetch_server_versions(
2030        &self,
2031        request_config: Option<RequestConfig>,
2032    ) -> HttpResult<get_supported_versions::Response> {
2033        // Since this was called by the user, try to refresh the access token if
2034        // necessary.
2035        self.fetch_server_versions_inner(false, request_config).await
2036    }
2037
2038    /// Fetches server versions from network; no caching.
2039    ///
2040    /// If the access token is expired and `failsafe` is `false`, this will
2041    /// attempt to refresh the access token, otherwise this will try to make an
2042    /// unauthenticated request instead.
2043    pub(crate) async fn fetch_server_versions_inner(
2044        &self,
2045        failsafe: bool,
2046        request_config: Option<RequestConfig>,
2047    ) -> HttpResult<get_supported_versions::Response> {
2048        if !failsafe {
2049            // `Client::send()` handles refreshing access tokens.
2050            return self
2051                .send(get_supported_versions::Request::new())
2052                .with_request_config(request_config)
2053                .await;
2054        }
2055
2056        let homeserver = self.homeserver().to_string();
2057
2058        // If we have a fresh access token, try with it first.
2059        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2060            && self.auth_ctx().has_valid_access_token()
2061            && let Some(access_token) = self.access_token()
2062        {
2063            let result = self
2064                .inner
2065                .http_client
2066                .send(
2067                    get_supported_versions::Request::new(),
2068                    request_config,
2069                    homeserver.clone(),
2070                    Some(&access_token),
2071                    (),
2072                    Default::default(),
2073                )
2074                .await;
2075
2076            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2077                result.as_ref().map_err(HttpError::client_api_error_kind)
2078            {
2079                // If the access token is actually expired, mark it as expired and fallback to
2080                // the unauthenticated request below.
2081                self.auth_ctx().set_access_token_expired(&access_token);
2082            } else {
2083                // If the request succeeded or it's an other error, just stop now.
2084                return result;
2085            }
2086        }
2087
2088        // Try without authentication.
2089        self.inner
2090            .http_client
2091            .send(
2092                get_supported_versions::Request::new(),
2093                request_config,
2094                homeserver.clone(),
2095                None,
2096                (),
2097                Default::default(),
2098            )
2099            .await
2100    }
2101
2102    /// Fetches client well_known from network; no caching.
2103    ///
2104    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2105    ///    well-known contents.
2106    /// 2. If it's not, we try extracting the server name from the
2107    ///    [`Client::user_id`] and building the server URL from it.
2108    /// 3. If we couldn't get the well-known contents with either the explicit
2109    ///    server name or the implicit extracted one, we try the homeserver URL
2110    ///    as a last resort.
2111    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2112        let homeserver = self.homeserver();
2113        let scheme = homeserver.scheme();
2114
2115        // Use the server name, either an explicit one or an implicit one taken from
2116        // the user id: sometimes we'll have only the homeserver url available and no
2117        // server name, but the server name can be extracted from the current user id.
2118        let server_url = self
2119            .server()
2120            .map(|server| server.to_string())
2121            // If the server name wasn't available, extract it from the user id and build a URL:
2122            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2123            // will be the same for the public server url, lacking a better candidate.
2124            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2125
2126        // If the server name is available, first try using it
2127        let response = if let Some(server_url) = server_url {
2128            // First try using the server name
2129            self.fetch_client_well_known_with_url(server_url).await
2130        } else {
2131            None
2132        };
2133
2134        // If we didn't get a well-known value yet, try with the homeserver url instead:
2135        if response.is_none() {
2136            // Sometimes people configure their well-known directly on the homeserver so use
2137            // this as a fallback when the server name is unknown.
2138            warn!(
2139                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2140            );
2141            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2142        } else {
2143            response
2144        }
2145    }
2146
2147    async fn fetch_client_well_known_with_url(
2148        &self,
2149        url: String,
2150    ) -> Option<discover_homeserver::Response> {
2151        let well_known = self
2152            .inner
2153            .http_client
2154            .send(
2155                discover_homeserver::Request::new(),
2156                Some(RequestConfig::short_retry()),
2157                url,
2158                None,
2159                (),
2160                Default::default(),
2161            )
2162            .await;
2163
2164        match well_known {
2165            Ok(well_known) => Some(well_known),
2166            Err(http_error) => {
2167                // It is perfectly valid to not have a well-known file.
2168                // Maybe we should check for a specific error code to be sure?
2169                warn!("Failed to fetch client well-known: {http_error}");
2170                None
2171            }
2172        }
2173    }
2174
2175    /// Load supported versions from storage, or fetch them from network and
2176    /// cache them.
2177    ///
2178    /// If `failsafe` is true, this will try to minimize side effects to avoid
2179    /// possible deadlocks.
2180    async fn fetch_supported_versions(
2181        &self,
2182        failsafe: bool,
2183    ) -> HttpResult<SupportedVersionsResponse> {
2184        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2185        let supported_versions = SupportedVersionsResponse {
2186            versions: server_versions.versions,
2187            unstable_features: server_versions.unstable_features,
2188        };
2189
2190        Ok(supported_versions)
2191    }
2192
2193    /// Get the Matrix versions and features supported by the homeserver by
2194    /// fetching them from the server or the cache.
2195    ///
2196    /// This is equivalent to calling both [`Client::server_versions()`] and
2197    /// [`Client::unstable_features()`]. To always fetch the result from the
2198    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2199    /// and then `.as_supported_versions()` on the response.
2200    ///
2201    /// # Examples
2202    ///
2203    /// ```no_run
2204    /// use ruma::api::{FeatureFlag, MatrixVersion};
2205    /// # use matrix_sdk::{Client, config::SyncSettings};
2206    /// # use url::Url;
2207    /// # async {
2208    /// # let homeserver = Url::parse("http://localhost:8080")?;
2209    /// # let mut client = Client::new(homeserver).await?;
2210    ///
2211    /// let supported = client.supported_versions().await?;
2212    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2213    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2214    ///
2215    /// let msc_x_feature = FeatureFlag::from("msc_x");
2216    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2217    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2218    /// # anyhow::Ok(()) };
2219    /// ```
2220    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2221        self.supported_versions_inner(false).await
2222    }
2223
2224    /// Get the Matrix versions and features supported by the homeserver by
2225    /// fetching them from the server or the cache.
2226    ///
2227    /// If `failsafe` is true, this will try to minimize side effects to avoid
2228    /// possible deadlocks.
2229    pub(crate) async fn supported_versions_inner(
2230        &self,
2231        failsafe: bool,
2232    ) -> HttpResult<SupportedVersions> {
2233        match self.supported_versions_cached_inner(failsafe).await {
2234            Ok(Some(value)) => {
2235                return Ok(value);
2236            }
2237            Ok(None) => {
2238                // The cache is empty, make a request.
2239            }
2240            Err(error) => {
2241                warn!("error when loading cached supported versions: {error}");
2242                // Fallthrough to make a request.
2243            }
2244        }
2245
2246        self.refresh_supported_versions_cache(failsafe).await
2247    }
2248
2249    /// Refresh the Matrix versions and features supported by the homeserver in
2250    /// the cache.
2251    ///
2252    /// If `failsafe` is true, this will try to minimize side effects to avoid
2253    /// possible deadlocks.
2254    async fn refresh_supported_versions_cache(
2255        &self,
2256        failsafe: bool,
2257    ) -> HttpResult<SupportedVersions> {
2258        let cached_supported_versions = &self.inner.caches.supported_versions;
2259
2260        let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2261            Ok(guard) => guard,
2262            Err(_) => {
2263                // There is already a refresh in progress, wait for it to finish.
2264                let guard = cached_supported_versions.refresh_lock.lock().await;
2265
2266                if let Err(error) = guard.as_ref() {
2267                    // There was an error in the previous refresh, return it.
2268                    return Err(HttpError::Cached(error.clone()));
2269                }
2270
2271                // Reuse the data if it was cached and it hasn't expired.
2272                if let CachedValue::Cached(value) = cached_supported_versions.value()
2273                    && !value.has_expired()
2274                {
2275                    return Ok(value.into_data());
2276                }
2277
2278                // The data wasn't cached or has expired, we need to make another request.
2279                guard
2280            }
2281        };
2282
2283        let response = match self.fetch_supported_versions(failsafe).await {
2284            Ok(response) => {
2285                *supported_versions_guard = Ok(());
2286                TtlValue::new(response)
2287            }
2288            Err(error) => {
2289                let error = Arc::new(error);
2290                *supported_versions_guard = Err(error.clone());
2291                return Err(HttpError::Cached(error));
2292            }
2293        };
2294
2295        let supported_versions = response.as_ref().map(|response| response.supported_versions());
2296
2297        // Only cache the result if the request was authenticated.
2298        if self.auth_ctx().has_valid_access_token() {
2299            if let Err(err) = self
2300                .state_store()
2301                .set_kv_data(
2302                    StateStoreDataKey::SupportedVersions,
2303                    StateStoreDataValue::SupportedVersions(response),
2304                )
2305                .await
2306            {
2307                warn!("error when caching supported versions: {err}");
2308            }
2309
2310            cached_supported_versions.set_value(supported_versions.clone());
2311        }
2312
2313        Ok(supported_versions.into_data())
2314    }
2315
2316    /// Get the Matrix versions and features supported by the homeserver by
2317    /// fetching them from the cache.
2318    ///
2319    /// For a version of this function that fetches the supported versions and
2320    /// features from the homeserver if the [`SupportedVersions`] aren't
2321    /// found in the cache, take a look at the [`Client::supported_versions()`]
2322    /// method.
2323    ///
2324    /// If the data in the cache has expired, this will trigger a background
2325    /// task to refresh it.
2326    ///
2327    /// # Examples
2328    ///
2329    /// ```no_run
2330    /// use ruma::api::{FeatureFlag, MatrixVersion};
2331    /// # use matrix_sdk::{Client, config::SyncSettings};
2332    /// # use url::Url;
2333    /// # async {
2334    /// # let homeserver = Url::parse("http://localhost:8080")?;
2335    /// # let mut client = Client::new(homeserver).await?;
2336    ///
2337    /// let supported =
2338    ///     if let Some(supported) = client.supported_versions_cached().await? {
2339    ///         supported
2340    ///     } else {
2341    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2342    ///     };
2343    ///
2344    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2345    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2346    ///
2347    /// let msc_x_feature = FeatureFlag::from("msc_x");
2348    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2349    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2350    /// # anyhow::Ok(()) };
2351    /// ```
2352    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2353        self.supported_versions_cached_inner(false).await
2354    }
2355
2356    async fn supported_versions_cached_inner(
2357        &self,
2358        failsafe: bool,
2359    ) -> Result<Option<SupportedVersions>, StoreError> {
2360        let supported_versions_cache = &self.inner.caches.supported_versions;
2361
2362        let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2363            cached
2364        } else if let Some(stored) = self
2365            .state_store()
2366            .get_kv_data(StateStoreDataKey::SupportedVersions)
2367            .await?
2368            .and_then(|value| value.into_supported_versions())
2369        {
2370            let stored = stored.map(|response| response.supported_versions());
2371
2372            // Copy the data from the store in the in-memory cache.
2373            supported_versions_cache.set_value(stored.clone());
2374
2375            stored
2376        } else {
2377            return Ok(None);
2378        };
2379
2380        // Spawn a task to refresh the cache if it has expired and we have a valid
2381        // access token.
2382        if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2383            debug!("spawning task to refresh supported versions cache");
2384
2385            let client = self.clone();
2386            self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2387                if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2388                    warn!("failed to refresh supported versions cache: {error}");
2389                }
2390            });
2391        }
2392
2393        Ok(Some(value.into_data()))
2394    }
2395
2396    /// Get the Matrix versions supported by the homeserver by fetching them
2397    /// from the server or the cache.
2398    ///
2399    /// # Examples
2400    ///
2401    /// ```no_run
2402    /// use ruma::api::MatrixVersion;
2403    /// # use matrix_sdk::{Client, config::SyncSettings};
2404    /// # use url::Url;
2405    /// # async {
2406    /// # let homeserver = Url::parse("http://localhost:8080")?;
2407    /// # let mut client = Client::new(homeserver).await?;
2408    ///
2409    /// let server_versions = client.server_versions().await?;
2410    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2411    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2412    /// # anyhow::Ok(()) };
2413    /// ```
2414    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2415        Ok(self.supported_versions().await?.versions)
2416    }
2417
2418    /// Get the unstable features supported by the homeserver by fetching them
2419    /// from the server or the cache.
2420    ///
2421    /// # Examples
2422    ///
2423    /// ```no_run
2424    /// use matrix_sdk::ruma::api::FeatureFlag;
2425    /// # use matrix_sdk::{Client, config::SyncSettings};
2426    /// # use url::Url;
2427    /// # async {
2428    /// # let homeserver = Url::parse("http://localhost:8080")?;
2429    /// # let mut client = Client::new(homeserver).await?;
2430    ///
2431    /// let msc_x_feature = FeatureFlag::from("msc_x");
2432    /// let unstable_features = client.unstable_features().await?;
2433    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2434    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2435    /// # anyhow::Ok(()) };
2436    /// ```
2437    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2438        Ok(self.supported_versions().await?.features)
2439    }
2440
2441    /// Empty the supported versions and unstable features cache.
2442    ///
2443    /// Since the SDK caches the supported versions, it's possible to have a
2444    /// stale entry in the cache. This functions makes it possible to force
2445    /// reset it.
2446    pub async fn reset_supported_versions(&self) -> Result<()> {
2447        // Empty the in-memory cache.
2448        self.inner.caches.supported_versions.reset();
2449
2450        // Empty the store cache.
2451        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2452    }
2453
2454    /// Get the well-known file of the homeserver from the cache.
2455    ///
2456    /// If the data in the cache has expired, this will trigger a background
2457    /// task to refresh it.
2458    async fn well_known_cached(
2459        &self,
2460    ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2461        let well_known_cache = &self.inner.caches.well_known;
2462
2463        let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2464            cached
2465        } else if let Some(stored) = self
2466            .state_store()
2467            .get_kv_data(StateStoreDataKey::WellKnown)
2468            .await?
2469            .and_then(|value| value.into_well_known())
2470        {
2471            // Copy the data from the store into the in-memory cache.
2472            well_known_cache.set_value(stored.clone());
2473
2474            stored
2475        } else {
2476            return Ok(CachedValue::NotSet);
2477        };
2478
2479        // Spawn a task to refresh the cache if it has expired.
2480        if value.has_expired() {
2481            debug!("spawning task to refresh well-known cache");
2482
2483            let client = self.clone();
2484            self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2485                client.refresh_well_known_cache().await;
2486            });
2487        }
2488
2489        Ok(CachedValue::Cached(value.into_data()))
2490    }
2491
2492    /// Refresh the well-known file of the homeserver in the cache.
2493    async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2494        let well_known_cache = &self.inner.caches.well_known;
2495
2496        let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2497            Ok(guard) => guard,
2498            Err(_) => {
2499                // There is already a refresh in progress, wait for it to finish.
2500                let guard = well_known_cache.refresh_lock.lock().await;
2501
2502                // A refresh can't fail because we ignore failures, so there shouldn't be an
2503                // error in the refresh lock.
2504
2505                // Reuse the data if it was cached and it hasn't expired.
2506                if let CachedValue::Cached(value) = well_known_cache.value()
2507                    && !value.has_expired()
2508                {
2509                    return value.into_data();
2510                }
2511
2512                // The data wasn't cached or has expired, we need to make another request.
2513                guard
2514            }
2515        };
2516
2517        let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2518
2519        if let Err(err) = self
2520            .state_store()
2521            .set_kv_data(
2522                StateStoreDataKey::WellKnown,
2523                StateStoreDataValue::WellKnown(well_known.clone()),
2524            )
2525            .await
2526        {
2527            warn!("error when caching well-known: {err}");
2528        }
2529
2530        well_known_cache.set_value(well_known.clone());
2531
2532        well_known.into_data()
2533    }
2534
2535    /// Get the well-known file of the homeserver by fetching it from the server
2536    /// or the cache.
2537    async fn well_known(&self) -> Option<WellKnownResponse> {
2538        match self.well_known_cached().await {
2539            Ok(CachedValue::Cached(value)) => {
2540                return value;
2541            }
2542            Ok(CachedValue::NotSet) => {
2543                // The cache is empty, make a request.
2544            }
2545            Err(error) => {
2546                warn!("error when loading cached well-known: {error}");
2547                // Fallthrough to make a request.
2548            }
2549        }
2550
2551        self.refresh_well_known_cache().await
2552    }
2553
2554    /// Get information about the homeserver's advertised RTC foci by fetching
2555    /// the well-known file from the server or the cache.
2556    ///
2557    /// # Examples
2558    /// ```no_run
2559    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::rtc::RtcTransport};
2560    /// # use url::Url;
2561    /// # async {
2562    /// # let homeserver = Url::parse("http://localhost:8080")?;
2563    /// # let mut client = Client::new(homeserver).await?;
2564    /// let rtc_foci = client.rtc_foci().await?;
2565    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2566    ///     RtcTransport::LiveKit(info) => Some(info),
2567    ///     _ => None,
2568    /// });
2569    /// if let Some(info) = default_livekit_focus_info {
2570    ///     println!("Default LiveKit service URL: {}", info.service_url);
2571    /// }
2572    /// # anyhow::Ok(()) };
2573    /// ```
2574    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcTransport>> {
2575        let well_known = self.well_known().await;
2576
2577        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2578    }
2579
2580    /// Get information about the homeserver's advertised map tile server, if
2581    /// any, by fetching the well-known file from the server or the cache.
2582    ///
2583    /// Returns `None` if the homeserver has not advertised a tile server in its
2584    /// well-known, or if the well-known is otherwise unavailable.
2585    pub async fn tile_server(&self) -> Option<TileServerInfo> {
2586        self.well_known().await.and_then(|well_known| well_known.tile_server).map(Into::into)
2587    }
2588
2589    /// Empty the well-known cache.
2590    ///
2591    /// Since the SDK caches the well-known, it's possible to have a stale entry
2592    /// in the cache. This functions makes it possible to force reset it.
2593    pub async fn reset_well_known(&self) -> Result<()> {
2594        // Empty the in-memory caches.
2595        self.inner.caches.well_known.reset();
2596
2597        // Empty the store cache.
2598        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2599    }
2600
2601    /// Check whether MSC 4028 is enabled on the homeserver.
2602    ///
2603    /// # Examples
2604    ///
2605    /// ```no_run
2606    /// # use matrix_sdk::{Client, config::SyncSettings};
2607    /// # use url::Url;
2608    /// # async {
2609    /// # let homeserver = Url::parse("http://localhost:8080")?;
2610    /// # let mut client = Client::new(homeserver).await?;
2611    /// let msc4028_enabled =
2612    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2613    /// # anyhow::Ok(()) };
2614    /// ```
2615    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2616        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2617    }
2618
2619    /// Get information of all our own devices.
2620    ///
2621    /// # Examples
2622    ///
2623    /// ```no_run
2624    /// # use matrix_sdk::{Client, config::SyncSettings};
2625    /// # use url::Url;
2626    /// # async {
2627    /// # let homeserver = Url::parse("http://localhost:8080")?;
2628    /// # let mut client = Client::new(homeserver).await?;
2629    /// let response = client.devices().await?;
2630    ///
2631    /// for device in response.devices {
2632    ///     println!(
2633    ///         "Device: {} {}",
2634    ///         device.device_id,
2635    ///         device.display_name.as_deref().unwrap_or("")
2636    ///     );
2637    /// }
2638    /// # anyhow::Ok(()) };
2639    /// ```
2640    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2641        let request = get_devices::v3::Request::new();
2642
2643        self.send(request).await
2644    }
2645
2646    /// Delete the given devices from the server.
2647    ///
2648    /// # Arguments
2649    ///
2650    /// * `devices` - The list of devices that should be deleted from the
2651    ///   server.
2652    ///
2653    /// * `auth_data` - This request requires user interactive auth, the first
2654    ///   request needs to set this to `None` and will always fail with an
2655    ///   `UiaaResponse`. The response will contain information for the
2656    ///   interactive auth and the same request needs to be made but this time
2657    ///   with some `auth_data` provided.
2658    ///
2659    /// ```no_run
2660    /// # use matrix_sdk::{
2661    /// #    ruma::{api::client::uiaa, owned_device_id},
2662    /// #    Client, Error, config::SyncSettings,
2663    /// # };
2664    /// # use serde_json::json;
2665    /// # use url::Url;
2666    /// # use std::collections::BTreeMap;
2667    /// # async {
2668    /// # let homeserver = Url::parse("http://localhost:8080")?;
2669    /// # let mut client = Client::new(homeserver).await?;
2670    /// let devices = &[owned_device_id!("DEVICEID")];
2671    ///
2672    /// if let Err(e) = client.delete_devices(devices, None).await {
2673    ///     if let Some(info) = e.as_uiaa_response() {
2674    ///         let mut password = uiaa::Password::new(
2675    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2676    ///             "wordpass".to_owned(),
2677    ///         );
2678    ///         password.session = info.session.clone();
2679    ///
2680    ///         client
2681    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2682    ///             .await?;
2683    ///     }
2684    /// }
2685    /// # anyhow::Ok(()) };
2686    pub async fn delete_devices(
2687        &self,
2688        devices: &[OwnedDeviceId],
2689        auth_data: Option<uiaa::AuthData>,
2690    ) -> HttpResult<delete_devices::v3::Response> {
2691        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2692        request.auth = auth_data;
2693
2694        self.send(request).await
2695    }
2696
2697    /// Change the display name of a device owned by the current user.
2698    ///
2699    /// Returns a `update_device::Response` which specifies the result
2700    /// of the operation.
2701    ///
2702    /// # Arguments
2703    ///
2704    /// * `device_id` - The ID of the device to change the display name of.
2705    /// * `display_name` - The new display name to set.
2706    pub async fn rename_device(
2707        &self,
2708        device_id: &DeviceId,
2709        display_name: &str,
2710    ) -> HttpResult<update_device::v3::Response> {
2711        let mut request = update_device::v3::Request::new(device_id.to_owned());
2712        request.display_name = Some(display_name.to_owned());
2713
2714        self.send(request).await
2715    }
2716
2717    /// Check whether a device with a specific ID exists on the server.
2718    ///
2719    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2720    /// with 404 and the underlying error otherwise.
2721    ///
2722    /// # Arguments
2723    ///
2724    /// * `device_id` - The ID of the device to query.
2725    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2726        let request = device::get_device::v3::Request::new(device_id);
2727        match self.send(request).await {
2728            Ok(_) => Ok(true),
2729            Err(err) => {
2730                if let Some(error) = err.as_client_api_error()
2731                    && error.status_code == 404
2732                {
2733                    Ok(false)
2734                } else {
2735                    Err(err.into())
2736                }
2737            }
2738        }
2739    }
2740
2741    /// Synchronize the client's state with the latest state on the server.
2742    ///
2743    /// ## Syncing Events
2744    ///
2745    /// Messages or any other type of event need to be periodically fetched from
2746    /// the server, this is achieved by sending a `/sync` request to the server.
2747    ///
2748    /// The first sync is sent out without a [`token`]. The response of the
2749    /// first sync will contain a [`next_batch`] field which should then be
2750    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2751    /// don't receive the same events multiple times.
2752    ///
2753    /// ## Long Polling
2754    ///
2755    /// A sync should in the usual case always be in flight. The
2756    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2757    /// long the server will wait for new events before it will respond.
2758    /// The server will respond immediately if some new events arrive before the
2759    /// timeout has expired. If no changes arrive and the timeout expires an
2760    /// empty sync response will be sent to the client.
2761    ///
2762    /// This method of sending a request that may not receive a response
2763    /// immediately is called long polling.
2764    ///
2765    /// ## Filtering Events
2766    ///
2767    /// The number or type of messages and events that the client should receive
2768    /// from the server can be altered using a [`Filter`].
2769    ///
2770    /// Filters can be non-trivial and, since they will be sent with every sync
2771    /// request, they may take up a bunch of unnecessary bandwidth.
2772    ///
2773    /// Luckily filters can be uploaded to the server and reused using an unique
2774    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2775    /// method.
2776    ///
2777    /// # Arguments
2778    ///
2779    /// * `sync_settings` - Settings for the sync call, this allows us to set
2780    /// various options to configure the sync:
2781    ///     * [`filter`] - To configure which events we receive and which get
2782    ///       [filtered] by the server
2783    ///     * [`timeout`] - To configure our [long polling] setup.
2784    ///     * [`token`] - To tell the server which events we already received
2785    ///       and where we wish to continue syncing.
2786    ///     * [`full_state`] - To tell the server that we wish to receive all
2787    ///       state events, regardless of our configured [`token`].
2788    ///     * [`set_presence`] - To tell the server to set the presence and to
2789    ///       which state.
2790    ///
2791    /// # Examples
2792    ///
2793    /// ```no_run
2794    /// # use url::Url;
2795    /// # async {
2796    /// # let homeserver = Url::parse("http://localhost:8080")?;
2797    /// # let username = "";
2798    /// # let password = "";
2799    /// use matrix_sdk::{
2800    ///     Client, config::SyncSettings,
2801    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2802    /// };
2803    ///
2804    /// let client = Client::new(homeserver).await?;
2805    /// client.matrix_auth().login_username(username, password).send().await?;
2806    ///
2807    /// // Sync once so we receive the client state and old messages.
2808    /// client.sync_once(SyncSettings::default()).await?;
2809    ///
2810    /// // Register our handler so we start responding once we receive a new
2811    /// // event.
2812    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2813    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2814    /// });
2815    ///
2816    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2817    /// // from our `sync_once()` call automatically.
2818    /// client.sync(SyncSettings::default()).await;
2819    /// # anyhow::Ok(()) };
2820    /// ```
2821    ///
2822    /// [`sync`]: #method.sync
2823    /// [`SyncSettings`]: crate::config::SyncSettings
2824    /// [`token`]: crate::config::SyncSettings#method.token
2825    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2826    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2827    /// [`set_presence`]: ruma::presence::PresenceState
2828    /// [`filter`]: crate::config::SyncSettings#method.filter
2829    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2830    /// [`next_batch`]: SyncResponse#structfield.next_batch
2831    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2832    /// [long polling]: #long-polling
2833    /// [filtered]: #filtering-events
2834    #[instrument(skip(self))]
2835    pub async fn sync_once(
2836        &self,
2837        sync_settings: crate::config::SyncSettings,
2838    ) -> Result<SyncResponse> {
2839        // The sync might not return for quite a while due to the timeout.
2840        // We'll see if there's anything crypto related to send out before we
2841        // sync, i.e. if we closed our client after a sync but before the
2842        // crypto requests were sent out.
2843        //
2844        // This will mostly be a no-op.
2845        #[cfg(feature = "e2e-encryption")]
2846        if let Err(e) = self.send_outgoing_requests().await {
2847            error!(error = ?e, "Error while sending outgoing E2EE requests");
2848        }
2849
2850        let token = match sync_settings.token {
2851            SyncToken::Specific(token) => Some(token),
2852            SyncToken::NoToken => None,
2853            SyncToken::ReusePrevious => self.sync_token().await,
2854        };
2855
2856        let request = assign!(sync_events::v3::Request::new(), {
2857            filter: sync_settings.filter.map(|f| *f),
2858            since: token,
2859            full_state: sync_settings.full_state,
2860            set_presence: sync_settings.set_presence,
2861            timeout: sync_settings.timeout,
2862            use_state_after: true,
2863        });
2864        let mut request_config = self.request_config();
2865        if let Some(timeout) = sync_settings.timeout {
2866            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2867            request_config.timeout = Some(base_timeout + timeout);
2868        }
2869
2870        let response = self.send(request).with_request_config(request_config).await?;
2871        let next_batch = response.next_batch.clone();
2872        let response = self.process_sync(response).await?;
2873
2874        #[cfg(feature = "e2e-encryption")]
2875        if let Err(e) = self.send_outgoing_requests().await {
2876            error!(error = ?e, "Error while sending outgoing E2EE requests");
2877        }
2878
2879        self.inner.sync_beat.notify(usize::MAX);
2880
2881        Ok(SyncResponse::new(next_batch, response))
2882    }
2883
2884    /// Repeatedly synchronize the client state with the server.
2885    ///
2886    /// This method will only return on error, if cancellation is needed
2887    /// the method should be wrapped in a cancelable task or the
2888    /// [`Client::sync_with_callback`] method can be used or
2889    /// [`Client::sync_with_result_callback`] if you want to handle error
2890    /// cases in the loop, too.
2891    ///
2892    /// This method will internally call [`Client::sync_once`] in a loop.
2893    ///
2894    /// This method can be used with the [`Client::add_event_handler`]
2895    /// method to react to individual events. If you instead wish to handle
2896    /// events in a bulk manner the [`Client::sync_with_callback`],
2897    /// [`Client::sync_with_result_callback`] and
2898    /// [`Client::sync_stream`] methods can be used instead. Those methods
2899    /// repeatedly return the whole sync response.
2900    ///
2901    /// # Arguments
2902    ///
2903    /// * `sync_settings` - Settings for the sync call. *Note* that those
2904    ///   settings will be only used for the first sync call. See the argument
2905    ///   docs for [`Client::sync_once`] for more info.
2906    ///
2907    /// # Return
2908    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2909    /// up to the user of the API to check the error and decide whether the sync
2910    /// should continue or not.
2911    ///
2912    /// # Examples
2913    ///
2914    /// ```no_run
2915    /// # use url::Url;
2916    /// # async {
2917    /// # let homeserver = Url::parse("http://localhost:8080")?;
2918    /// # let username = "";
2919    /// # let password = "";
2920    /// use matrix_sdk::{
2921    ///     Client, config::SyncSettings,
2922    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2923    /// };
2924    ///
2925    /// let client = Client::new(homeserver).await?;
2926    /// client.matrix_auth().login_username(&username, &password).send().await?;
2927    ///
2928    /// // Register our handler so we start responding once we receive a new
2929    /// // event.
2930    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2931    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2932    /// });
2933    ///
2934    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2935    /// // automatically.
2936    /// client.sync(SyncSettings::default()).await?;
2937    /// # anyhow::Ok(()) };
2938    /// ```
2939    ///
2940    /// [argument docs]: #method.sync_once
2941    /// [`sync_with_callback`]: #method.sync_with_callback
2942    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2943        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2944    }
2945
2946    /// Repeatedly call sync to synchronize the client state with the server.
2947    ///
2948    /// # Arguments
2949    ///
2950    /// * `sync_settings` - Settings for the sync call. *Note* that those
2951    ///   settings will be only used for the first sync call. See the argument
2952    ///   docs for [`Client::sync_once`] for more info.
2953    ///
2954    /// * `callback` - A callback that will be called every time a successful
2955    ///   response has been fetched from the server. The callback must return a
2956    ///   boolean which signalizes if the method should stop syncing. If the
2957    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2958    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2959    ///
2960    /// # Return
2961    /// The sync runs until an error occurs or the
2962    /// callback indicates that the Loop should stop. If the callback asked for
2963    /// a regular stop, the result will be `Ok(())` otherwise the
2964    /// `Err(Error)` is returned.
2965    ///
2966    /// # Examples
2967    ///
2968    /// The following example demonstrates how to sync forever while sending all
2969    /// the interesting events through a mpsc channel to another thread e.g. a
2970    /// UI thread.
2971    ///
2972    /// ```no_run
2973    /// # use std::time::Duration;
2974    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2975    /// # use url::Url;
2976    /// # async {
2977    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2978    /// # let mut client = Client::new(homeserver).await.unwrap();
2979    ///
2980    /// use tokio::sync::mpsc::channel;
2981    ///
2982    /// let (tx, rx) = channel(100);
2983    ///
2984    /// let sync_channel = &tx;
2985    /// let sync_settings = SyncSettings::new()
2986    ///     .timeout(Duration::from_secs(30));
2987    ///
2988    /// client
2989    ///     .sync_with_callback(sync_settings, |response| async move {
2990    ///         let channel = sync_channel;
2991    ///         for (room_id, room) in response.rooms.joined {
2992    ///             for event in room.timeline.events {
2993    ///                 channel.send(event).await.unwrap();
2994    ///             }
2995    ///         }
2996    ///
2997    ///         LoopCtrl::Continue
2998    ///     })
2999    ///     .await;
3000    /// };
3001    /// ```
3002    #[instrument(skip_all)]
3003    pub async fn sync_with_callback<C>(
3004        &self,
3005        sync_settings: crate::config::SyncSettings,
3006        callback: impl Fn(SyncResponse) -> C,
3007    ) -> Result<(), Error>
3008    where
3009        C: Future<Output = LoopCtrl>,
3010    {
3011        self.sync_with_result_callback(sync_settings, |result| async {
3012            Ok(callback(result?).await)
3013        })
3014        .await
3015    }
3016
3017    /// Repeatedly call sync to synchronize the client state with the server.
3018    ///
3019    /// # Arguments
3020    ///
3021    /// * `sync_settings` - Settings for the sync call. *Note* that those
3022    ///   settings will be only used for the first sync call. See the argument
3023    ///   docs for [`Client::sync_once`] for more info.
3024    ///
3025    /// * `callback` - A callback that will be called every time after a
3026    ///   response has been received, failure or not. The callback returns a
3027    ///   `Result<LoopCtrl, Error>`, too. When returning
3028    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3029    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3030    ///   function returns `Ok(())`. In case the callback can't handle the
3031    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
3032    ///   which results in the sync ending and the `Err(Error)` being returned.
3033    ///
3034    /// # Return
3035    /// The sync runs until an error occurs that the callback can't handle or
3036    /// the callback indicates that the Loop should stop. If the callback
3037    /// asked for a regular stop, the result will be `Ok(())` otherwise the
3038    /// `Err(Error)` is returned.
3039    ///
3040    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3041    /// this, and are handled first without sending the result to the
3042    /// callback. Only after they have exceeded is the `Result` handed to
3043    /// the callback.
3044    ///
3045    /// # Examples
3046    ///
3047    /// The following example demonstrates how to sync forever while sending all
3048    /// the interesting events through a mpsc channel to another thread e.g. a
3049    /// UI thread.
3050    ///
3051    /// ```no_run
3052    /// # use std::time::Duration;
3053    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3054    /// # use url::Url;
3055    /// # async {
3056    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3057    /// # let mut client = Client::new(homeserver).await.unwrap();
3058    /// #
3059    /// use tokio::sync::mpsc::channel;
3060    ///
3061    /// let (tx, rx) = channel(100);
3062    ///
3063    /// let sync_channel = &tx;
3064    /// let sync_settings = SyncSettings::new()
3065    ///     .timeout(Duration::from_secs(30));
3066    ///
3067    /// client
3068    ///     .sync_with_result_callback(sync_settings, |response| async move {
3069    ///         let channel = sync_channel;
3070    ///         let sync_response = response?;
3071    ///         for (room_id, room) in sync_response.rooms.joined {
3072    ///              for event in room.timeline.events {
3073    ///                  channel.send(event).await.unwrap();
3074    ///               }
3075    ///         }
3076    ///
3077    ///         Ok(LoopCtrl::Continue)
3078    ///     })
3079    ///     .await;
3080    /// };
3081    /// ```
3082    #[instrument(skip(self, callback))]
3083    pub async fn sync_with_result_callback<C>(
3084        &self,
3085        sync_settings: crate::config::SyncSettings,
3086        callback: impl Fn(Result<SyncResponse, Error>) -> C,
3087    ) -> Result<(), Error>
3088    where
3089        C: Future<Output = Result<LoopCtrl, Error>>,
3090    {
3091        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3092
3093        while let Some(result) = sync_stream.next().await {
3094            trace!("Running callback");
3095            if callback(result).await? == LoopCtrl::Break {
3096                trace!("Callback told us to stop");
3097                break;
3098            }
3099            trace!("Done running callback");
3100        }
3101
3102        Ok(())
3103    }
3104
3105    //// Repeatedly synchronize the client state with the server.
3106    ///
3107    /// This method will internally call [`Client::sync_once`] in a loop and is
3108    /// equivalent to the [`Client::sync`] method but the responses are provided
3109    /// as an async stream.
3110    ///
3111    /// # Arguments
3112    ///
3113    /// * `sync_settings` - Settings for the sync call. *Note* that those
3114    ///   settings will be only used for the first sync call. See the argument
3115    ///   docs for [`Client::sync_once`] for more info.
3116    ///
3117    /// # Examples
3118    ///
3119    /// ```no_run
3120    /// # use url::Url;
3121    /// # async {
3122    /// # let homeserver = Url::parse("http://localhost:8080")?;
3123    /// # let username = "";
3124    /// # let password = "";
3125    /// use futures_util::StreamExt;
3126    /// use matrix_sdk::{Client, config::SyncSettings};
3127    ///
3128    /// let client = Client::new(homeserver).await?;
3129    /// client.matrix_auth().login_username(&username, &password).send().await?;
3130    ///
3131    /// let mut sync_stream =
3132    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3133    ///
3134    /// while let Some(Ok(response)) = sync_stream.next().await {
3135    ///     for room in response.rooms.joined.values() {
3136    ///         for e in &room.timeline.events {
3137    ///             if let Ok(event) = e.raw().deserialize() {
3138    ///                 println!("Received event {:?}", event);
3139    ///             }
3140    ///         }
3141    ///     }
3142    /// }
3143    ///
3144    /// # anyhow::Ok(()) };
3145    /// ```
3146    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3147    #[instrument(skip(self))]
3148    pub async fn sync_stream(
3149        &self,
3150        mut sync_settings: crate::config::SyncSettings,
3151    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3152        let mut is_first_sync = true;
3153        let mut timeout = None;
3154        let mut last_sync_time: Option<Instant> = None;
3155
3156        let parent_span = Span::current();
3157
3158        async_stream::stream!({
3159            loop {
3160                trace!("Syncing");
3161
3162                if sync_settings.ignore_timeout_on_first_sync {
3163                    if is_first_sync {
3164                        timeout = sync_settings.timeout.take();
3165                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3166                        sync_settings.timeout = timeout.take();
3167                    }
3168
3169                    is_first_sync = false;
3170                }
3171
3172                yield self
3173                    .sync_loop_helper(&mut sync_settings)
3174                    .instrument(parent_span.clone())
3175                    .await;
3176
3177                Client::delay_sync(&mut last_sync_time).await
3178            }
3179        })
3180    }
3181
3182    /// Get the current, if any, sync token of the client.
3183    /// This will be None if the client didn't sync at least once.
3184    pub(crate) async fn sync_token(&self) -> Option<String> {
3185        self.inner.base_client.sync_token().await
3186    }
3187
3188    /// Gets information about the owner of a given access token.
3189    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3190        let request = whoami::v3::Request::new();
3191        self.send(request).await
3192    }
3193
3194    /// Subscribes a new receiver to client SessionChange broadcasts.
3195    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3196        let broadcast = &self.auth_ctx().session_change_sender;
3197        broadcast.subscribe()
3198    }
3199
3200    /// Sets the save/restore session callbacks.
3201    ///
3202    /// This is another mechanism to get synchronous updates to session tokens,
3203    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3204    pub fn set_session_callbacks(
3205        &self,
3206        reload_session_callback: Box<ReloadSessionCallback>,
3207        save_session_callback: Box<SaveSessionCallback>,
3208    ) -> Result<()> {
3209        self.inner
3210            .auth_ctx
3211            .reload_session_callback
3212            .set(reload_session_callback)
3213            .map_err(|_| Error::MultipleSessionCallbacks)?;
3214
3215        self.inner
3216            .auth_ctx
3217            .save_session_callback
3218            .set(save_session_callback)
3219            .map_err(|_| Error::MultipleSessionCallbacks)?;
3220
3221        Ok(())
3222    }
3223
3224    /// Get the notification settings of the current owner of the client.
3225    pub async fn notification_settings(&self) -> NotificationSettings {
3226        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3227        NotificationSettings::new(self.clone(), ruleset)
3228    }
3229
3230    /// Create a new specialized `Client` that can process notifications.
3231    ///
3232    /// See [`CrossProcessLock::new`] to learn more about
3233    /// `cross_process_lock_config`.
3234    ///
3235    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3236    pub async fn notification_client(
3237        &self,
3238        cross_process_lock_config: CrossProcessLockConfig,
3239    ) -> Result<Client> {
3240        let client = Client {
3241            inner: ClientInner::new(
3242                self.inner.auth_ctx.clone(),
3243                self.server().cloned(),
3244                self.homeserver(),
3245                self.sliding_sync_version(),
3246                self.inner.http_client.clone(),
3247                self.inner
3248                    .base_client
3249                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3250                    .await?,
3251                self.inner.caches.supported_versions.value(),
3252                self.inner.caches.well_known.value(),
3253                self.inner.respect_login_well_known,
3254                self.inner.event_cache.clone(),
3255                self.inner.send_queue_data.clone(),
3256                self.inner.latest_events.clone(),
3257                #[cfg(feature = "e2e-encryption")]
3258                self.inner.e2ee.encryption_settings,
3259                #[cfg(feature = "e2e-encryption")]
3260                self.inner.enable_share_history_on_invite,
3261                cross_process_lock_config,
3262                #[cfg(feature = "experimental-search")]
3263                self.inner.search_index.clone(),
3264                self.inner.thread_subscription_catchup.clone(),
3265            )
3266            .await,
3267        };
3268
3269        Ok(client)
3270    }
3271
3272    /// The [`EventCache`] instance for this [`Client`].
3273    pub fn event_cache(&self) -> &EventCache {
3274        // SAFETY: always initialized in the `Client` ctor.
3275        self.inner.event_cache.get().unwrap()
3276    }
3277
3278    /// The [`LatestEvents`] instance for this [`Client`].
3279    pub async fn latest_events(&self) -> &LatestEvents {
3280        self.inner
3281            .latest_events
3282            .get_or_init(|| async {
3283                LatestEvents::new(
3284                    WeakClient::from_client(self),
3285                    self.event_cache().clone(),
3286                    SendQueue::new(self.clone()),
3287                    self.room_info_notable_update_receiver(),
3288                )
3289            })
3290            .await
3291    }
3292
3293    /// Waits until an at least partially synced room is received, and returns
3294    /// it.
3295    ///
3296    /// **Note: this function will loop endlessly until either it finds the room
3297    /// or an externally set timeout happens.**
3298    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3299        loop {
3300            if let Some(room) = self.get_room(room_id) {
3301                if room.is_state_partially_or_fully_synced() {
3302                    debug!("Found just created room!");
3303                    return room;
3304                }
3305                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3306            } else {
3307                debug!("Room wasn't found, waiting for sync beat to try again");
3308            }
3309            self.inner.sync_beat.listen().await;
3310        }
3311    }
3312
3313    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3314    /// join it.
3315    pub async fn knock(
3316        &self,
3317        room_id_or_alias: OwnedRoomOrAliasId,
3318        reason: Option<String>,
3319        server_names: Vec<OwnedServerName>,
3320    ) -> Result<Room> {
3321        let request =
3322            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3323        let response = self.send(request).await?;
3324        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3325        Ok(Room::new(self.clone(), base_room))
3326    }
3327
3328    /// Checks whether the provided `user_id` belongs to an ignored user.
3329    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3330        self.base_client().is_user_ignored(user_id).await
3331    }
3332
3333    /// Gets the `max_upload_size` value from the homeserver, getting either a
3334    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3335    /// missing.
3336    ///
3337    /// Check the spec for more info:
3338    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3339    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3340        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3341        if let Some(data) = max_upload_size_lock.get() {
3342            return Ok(data.to_owned());
3343        }
3344
3345        // Use the authenticated endpoint when the server supports it.
3346        let supported_versions = self.supported_versions().await?;
3347        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3348            .is_supported(&supported_versions);
3349
3350        let upload_size = if use_auth {
3351            self.send(authenticated_media::get_media_config::v1::Request::default())
3352                .await?
3353                .upload_size
3354        } else {
3355            #[allow(deprecated)]
3356            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3357        };
3358
3359        match max_upload_size_lock.set(upload_size) {
3360            Ok(_) => Ok(upload_size),
3361            Err(error) => {
3362                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3363            }
3364        }
3365    }
3366
3367    /// The settings to use for decrypting events.
3368    #[cfg(feature = "e2e-encryption")]
3369    pub fn decryption_settings(&self) -> &DecryptionSettings {
3370        &self.base_client().decryption_settings
3371    }
3372
3373    /// Returns the [`SearchIndex`] for this [`Client`].
3374    #[cfg(feature = "experimental-search")]
3375    pub fn search_index(&self) -> &SearchIndex {
3376        &self.inner.search_index
3377    }
3378
3379    /// Whether the client is configured to take thread subscriptions (MSC4306
3380    /// and MSC4308) into account, and the server enabled the experimental
3381    /// feature flag for it.
3382    ///
3383    /// This may cause filtering out of thread subscriptions, and loading the
3384    /// thread subscriptions via the sliding sync extension, when the room
3385    /// list service is being used.
3386    ///
3387    /// This is async and fallible as it may use the network to retrieve the
3388    /// server supported features, if they aren't cached already.
3389    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3390        // Check if the client is configured to support thread subscriptions first.
3391        match self.base_client().threading_support {
3392            ThreadingSupport::Enabled { with_subscriptions: false }
3393            | ThreadingSupport::Disabled => return Ok(false),
3394            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3395        }
3396
3397        // Now, let's check that the server supports it.
3398        let server_enabled = self
3399            .supported_versions()
3400            .await?
3401            .features
3402            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3403
3404        Ok(server_enabled)
3405    }
3406
3407    /// Fetch thread subscriptions changes between `from` and up to `to`.
3408    ///
3409    /// The `limit` optional parameter can be used to limit the number of
3410    /// entries in a response. It can also be overridden by the server, if
3411    /// it's deemed too large.
3412    pub async fn fetch_thread_subscriptions(
3413        &self,
3414        from: Option<String>,
3415        to: Option<String>,
3416        limit: Option<UInt>,
3417    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3418        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3419            from,
3420            to,
3421            limit,
3422        });
3423        Ok(self.send(request).await?)
3424    }
3425
3426    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3427        self.inner.thread_subscription_catchup.get().unwrap()
3428    }
3429
3430    /// Pause the client for background suspension.
3431    ///
3432    /// This method:
3433    /// 1. Disables all send queues (prevents new message sends).
3434    /// 2. Pauses all database stores, waiting for in-flight operations and
3435    ///    releasing all connections and file locks.
3436    ///
3437    /// Call [`Client::resume()`] when the app returns to the foreground.
3438    ///
3439    /// # iOS
3440    ///
3441    /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3442    /// Typically called from
3443    /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3444    /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3445    /// `matrix_sdk_ui::sync_service::SyncService`.
3446    pub async fn pause(&self) -> Result<()> {
3447        info!("Client::pause — releasing database resources");
3448
3449        // Disable send queues so no new sends hit the stores.
3450        self.send_queue().set_enabled(false).await;
3451
3452        // Close all stores (waits for in-flight ops, closes connections).
3453        self.base_client().close_stores().await?;
3454
3455        info!("Client::pause — complete, all database connections released");
3456        Ok(())
3457    }
3458
3459    /// Resume the client after a [`Client::pause()`].
3460    ///
3461    /// Re-acquires store resources and re-enables send queues.
3462    ///
3463    /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3464    /// before pausing, restart it separately as appropriate for your app
3465    /// lifecycle.
3466    pub async fn resume(&self) -> Result<()> {
3467        info!("Client::resume — re-acquiring database resources");
3468
3469        // Reopen stores (creates new connection pools).
3470        self.base_client().reopen_stores().await?;
3471
3472        // Re-enable send queues.
3473        self.send_queue().set_enabled(true).await;
3474
3475        info!("Client::resume — complete");
3476        Ok(())
3477    }
3478
3479    /// Perform database optimizations if any are available, i.e. vacuuming in
3480    /// SQLite.
3481    ///
3482    /// **Warning:** this was added to check if SQLite fragmentation was the
3483    /// source of performance issues, **DO NOT use in production**.
3484    #[doc(hidden)]
3485    pub async fn optimize_stores(&self) -> Result<()> {
3486        trace!("Optimizing state store...");
3487        self.state_store().optimize().await?;
3488
3489        trace!("Optimizing event cache store...");
3490        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3491            clean_lock.optimize().await?;
3492        }
3493
3494        trace!("Optimizing media store...");
3495        self.media_store().lock().await?.optimize().await?;
3496
3497        Ok(())
3498    }
3499
3500    /// Returns the sizes of the existing stores, if known.
3501    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3502        #[cfg(feature = "e2e-encryption")]
3503        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3504            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3505        {
3506            Some(store_size)
3507        } else {
3508            None
3509        };
3510        #[cfg(not(feature = "e2e-encryption"))]
3511        let crypto_store_size = None;
3512
3513        let state_store_size = self.state_store().get_size().await.ok().flatten();
3514
3515        let event_cache_store_size = if let Some(clean_lock) =
3516            self.event_cache_store().lock().await?.as_clean()
3517            && let Ok(Some(store_size)) = clean_lock.get_size().await
3518        {
3519            Some(store_size)
3520        } else {
3521            None
3522        };
3523
3524        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3525
3526        Ok(StoreSizes {
3527            crypto_store: crypto_store_size,
3528            state_store: state_store_size,
3529            event_cache_store: event_cache_store_size,
3530            media_store: media_store_size,
3531        })
3532    }
3533
3534    /// Get a reference to the client's task monitor, for spawning background
3535    /// tasks.
3536    pub fn task_monitor(&self) -> &TaskMonitor {
3537        &self.inner.task_monitor
3538    }
3539
3540    /// Add a subscriber for duplicate key upload error notifications triggered
3541    /// by requests to /keys/upload.
3542    #[cfg(feature = "e2e-encryption")]
3543    pub fn subscribe_to_duplicate_key_upload_errors(
3544        &self,
3545    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3546        self.inner.duplicate_key_upload_error_sender.subscribe()
3547    }
3548
3549    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3550    /// for the given room.
3551    ///
3552    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3553    #[cfg(feature = "e2e-encryption")]
3554    pub async fn get_pending_key_bundle_details_for_room(
3555        &self,
3556        room_id: &RoomId,
3557    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3558        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3559    }
3560
3561    /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3562    /// a DM.
3563    pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3564        &self.inner.base_client.dm_room_definition
3565    }
3566}
3567
3568/// Contains the disk size of the different stores, if known. It won't be
3569/// available for in-memory stores.
3570#[derive(Debug, Clone)]
3571pub struct StoreSizes {
3572    /// The size of the CryptoStore.
3573    pub crypto_store: Option<usize>,
3574    /// The size of the StateStore.
3575    pub state_store: Option<usize>,
3576    /// The size of the EventCacheStore.
3577    pub event_cache_store: Option<usize>,
3578    /// The size of the MediaStore.
3579    pub media_store: Option<usize>,
3580}
3581
3582#[cfg(any(feature = "testing", test))]
3583impl Client {
3584    /// Test helper to mark users as tracked by the crypto layer.
3585    #[cfg(feature = "e2e-encryption")]
3586    pub async fn update_tracked_users_for_testing(
3587        &self,
3588        user_ids: impl IntoIterator<Item = &UserId>,
3589    ) {
3590        let olm = self.olm_machine().await;
3591        let olm = olm.as_ref().unwrap();
3592        olm.update_tracked_users(user_ids).await.unwrap();
3593    }
3594}
3595
3596/// A weak reference to the inner client, useful when trying to get a handle
3597/// on the owning client.
3598#[derive(Clone, Debug)]
3599pub(crate) struct WeakClient {
3600    client: Weak<ClientInner>,
3601}
3602
3603impl WeakClient {
3604    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3605    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3606        Self { client: Arc::downgrade(client) }
3607    }
3608
3609    /// Construct a [`WeakClient`] from a [`Client`].
3610    pub fn from_client(client: &Client) -> Self {
3611        Self::from_inner(&client.inner)
3612    }
3613
3614    /// Attempts to get a [`Client`] from this [`WeakClient`].
3615    pub fn get(&self) -> Option<Client> {
3616        self.client.upgrade().map(|inner| Client { inner })
3617    }
3618
3619    /// Gets the number of strong (`Arc`) pointers still pointing to this
3620    /// client.
3621    #[allow(dead_code)]
3622    pub fn strong_count(&self) -> usize {
3623        self.client.strong_count()
3624    }
3625}
3626
3627/// Information about the state of a room before we joined it.
3628#[derive(Debug, Clone, Default)]
3629struct PreJoinRoomInfo {
3630    /// The user who invited us to the room, if any.
3631    pub inviter: Option<RoomMember>,
3632}
3633
3634// The http mocking library is not supported for wasm32
3635#[cfg(all(test, not(target_family = "wasm")))]
3636pub(crate) mod tests {
3637    use std::{sync::Arc, time::Duration};
3638
3639    use assert_matches::assert_matches;
3640    use assert_matches2::assert_let;
3641    use eyeball::SharedObservable;
3642    use futures_util::{FutureExt, StreamExt, pin_mut};
3643    use js_int::{UInt, uint};
3644    use matrix_sdk_base::{
3645        RoomState,
3646        store::{MemoryStore, StoreConfig},
3647        ttl::TtlValue,
3648    };
3649    use matrix_sdk_test::{
3650        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3651        event_factory::EventFactory,
3652    };
3653    #[cfg(target_family = "wasm")]
3654    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3655
3656    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3657    use ruma::{
3658        RoomId, ServerName, UserId,
3659        api::{
3660            FeatureFlag, MatrixVersion,
3661            client::{room::create_room::v3::Request as CreateRoomRequest, rtc::RtcTransport},
3662        },
3663        assign,
3664        events::{
3665            ignored_user_list::IgnoredUserListEventContent,
3666            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3667        },
3668        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3669    };
3670    use serde_json::json;
3671    use stream_assert::{assert_next_matches, assert_pending};
3672    use tokio::{
3673        spawn,
3674        time::{sleep, timeout},
3675    };
3676    use url::Url;
3677
3678    use super::Client;
3679    use crate::{
3680        Error, TransmissionProgress,
3681        client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3682        config::{RequestConfig, SyncSettings},
3683        futures::SendRequest,
3684        media::MediaError,
3685        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3686    };
3687
3688    #[async_test]
3689    async fn test_account_data() {
3690        let server = MatrixMockServer::new().await;
3691        let client = server.client_builder().build().await;
3692
3693        let f = EventFactory::new();
3694        server
3695            .mock_sync()
3696            .ok_and_run(&client, |builder| {
3697                builder.add_global_account_data(
3698                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3699                );
3700            })
3701            .await;
3702
3703        let content = client
3704            .account()
3705            .account_data::<IgnoredUserListEventContent>()
3706            .await
3707            .unwrap()
3708            .unwrap()
3709            .deserialize()
3710            .unwrap();
3711
3712        assert_eq!(content.ignored_users.len(), 1);
3713    }
3714
3715    #[async_test]
3716    async fn test_successful_discovery() {
3717        // Imagine this is `matrix.org`.
3718        let server = MatrixMockServer::new().await;
3719        let server_url = server.uri();
3720
3721        // Imagine this is `matrix-client.matrix.org`.
3722        let homeserver = MatrixMockServer::new().await;
3723        let homeserver_url = homeserver.uri();
3724
3725        // Imagine Alice has the user ID `@alice:matrix.org`.
3726        let domain = server_url.strip_prefix("http://").unwrap();
3727        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3728
3729        // The `.well-known` is on the server (e.g. `matrix.org`).
3730        server
3731            .mock_well_known()
3732            .ok_with_homeserver_url(&homeserver_url)
3733            .mock_once()
3734            .named("well-known")
3735            .mount()
3736            .await;
3737
3738        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3739        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3740
3741        let client = Client::builder()
3742            .insecure_server_name_no_tls(alice.server_name())
3743            .build()
3744            .await
3745            .unwrap();
3746
3747        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3748        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3749        client.server_versions().await.unwrap();
3750    }
3751
3752    #[async_test]
3753    async fn test_discovery_broken_server() {
3754        let server = MatrixMockServer::new().await;
3755        let server_url = server.uri();
3756        let domain = server_url.strip_prefix("http://").unwrap();
3757        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3758
3759        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3760
3761        assert!(
3762            Client::builder()
3763                .insecure_server_name_no_tls(alice.server_name())
3764                .build()
3765                .await
3766                .is_err(),
3767            "Creating a client from a user ID should fail when the .well-known request fails."
3768        );
3769    }
3770
3771    #[async_test]
3772    async fn test_room_creation() {
3773        let server = MatrixMockServer::new().await;
3774        let client = server.client_builder().build().await;
3775
3776        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3777        server
3778            .mock_sync()
3779            .ok_and_run(&client, |builder| {
3780                builder.add_joined_room(
3781                    JoinedRoomBuilder::default()
3782                        .add_state_event(
3783                            f.member(user_id!("@example:localhost")).display_name("example"),
3784                        )
3785                        .add_state_event(f.default_power_levels()),
3786                );
3787            })
3788            .await;
3789
3790        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3791        assert_eq!(room.state(), RoomState::Joined);
3792    }
3793
3794    #[async_test]
3795    async fn test_retry_limit_http_requests() {
3796        let server = MatrixMockServer::new().await;
3797        let client = server
3798            .client_builder()
3799            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3800            .build()
3801            .await;
3802
3803        assert!(client.request_config().retry_limit.unwrap() == 4);
3804
3805        server.mock_who_am_i().error500().expect(4).mount().await;
3806
3807        client.whoami().await.unwrap_err();
3808    }
3809
3810    #[async_test]
3811    async fn test_retry_timeout_http_requests() {
3812        // Keep this timeout small so that the test doesn't take long
3813        let retry_timeout = Duration::from_secs(5);
3814        let server = MatrixMockServer::new().await;
3815        let client = server
3816            .client_builder()
3817            .on_builder(|builder| {
3818                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3819            })
3820            .build()
3821            .await;
3822
3823        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3824
3825        server.mock_login().error500().expect(2..).mount().await;
3826
3827        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3828    }
3829
3830    #[async_test]
3831    async fn test_short_retry_initial_http_requests() {
3832        let server = MatrixMockServer::new().await;
3833        let client = server
3834            .client_builder()
3835            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3836            .build()
3837            .await;
3838
3839        server.mock_login().error500().expect(3..).mount().await;
3840
3841        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3842    }
3843
3844    #[async_test]
3845    async fn test_no_retry_http_requests() {
3846        let server = MatrixMockServer::new().await;
3847        let client = server.client_builder().build().await;
3848
3849        server.mock_devices().error500().mock_once().mount().await;
3850
3851        client.devices().await.unwrap_err();
3852    }
3853
3854    #[async_test]
3855    async fn test_set_homeserver() {
3856        let client = MockClientBuilder::new(None).build().await;
3857        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3858
3859        let homeserver = Url::parse("http://example.com/").unwrap();
3860        client.set_homeserver(homeserver.clone());
3861        assert_eq!(client.homeserver(), homeserver);
3862    }
3863
3864    #[async_test]
3865    async fn test_search_user_request() {
3866        let server = MatrixMockServer::new().await;
3867        let client = server.client_builder().build().await;
3868
3869        server.mock_user_directory().ok().mock_once().mount().await;
3870
3871        let response = client.search_users("test", 50).await.unwrap();
3872        assert_eq!(response.results.len(), 1);
3873        let result = &response.results[0];
3874        assert_eq!(result.user_id.to_string(), "@test:example.me");
3875        assert_eq!(result.display_name.clone().unwrap(), "Test");
3876        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3877        assert!(!response.limited);
3878    }
3879
3880    #[async_test]
3881    async fn test_request_unstable_features() {
3882        let server = MatrixMockServer::new().await;
3883        let client = server.client_builder().no_server_versions().build().await;
3884
3885        server
3886            .mock_versions()
3887            .with_feature("org.matrix.e2e_cross_signing", true)
3888            .ok()
3889            .mock_once()
3890            .mount()
3891            .await;
3892
3893        let unstable_features = client.unstable_features().await.unwrap();
3894        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3895        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3896    }
3897
3898    #[async_test]
3899    async fn test_can_homeserver_push_encrypted_event_to_device() {
3900        let server = MatrixMockServer::new().await;
3901        let client = server.client_builder().no_server_versions().build().await;
3902
3903        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3904
3905        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3906        assert!(msc4028_enabled);
3907    }
3908
3909    #[async_test]
3910    async fn test_recently_visited_rooms() {
3911        // Tracking recently visited rooms requires authentication
3912        let client = MockClientBuilder::new(None).unlogged().build().await;
3913        assert_matches!(
3914            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3915            Err(Error::AuthenticationRequired)
3916        );
3917
3918        let client = MockClientBuilder::new(None).build().await;
3919        let account = client.account();
3920
3921        // We should start off with an empty list
3922        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3923
3924        // Tracking a valid room id should add it to the list
3925        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3926        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3927        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3928
3929        // And the existing list shouldn't be changed
3930        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3931        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3932
3933        // Tracking the same room again shouldn't change the list
3934        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3935        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3936        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3937
3938        // Tracking a second room should add it to the front of the list
3939        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3940        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3941        assert_eq!(
3942            account.get_recently_visited_rooms().await.unwrap(),
3943            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3944        );
3945
3946        // Tracking the first room yet again should move it to the front of the list
3947        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3948        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3949        assert_eq!(
3950            account.get_recently_visited_rooms().await.unwrap(),
3951            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3952        );
3953
3954        // Tracking should be capped at 20
3955        for n in 0..20 {
3956            account
3957                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3958                .await
3959                .unwrap();
3960        }
3961
3962        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3963
3964        // And the initial rooms should've been pushed out
3965        let rooms = account.get_recently_visited_rooms().await.unwrap();
3966        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3967        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3968
3969        // And the last tracked room should be the first
3970        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3971    }
3972
3973    #[async_test]
3974    async fn test_client_no_cycle_with_event_cache() {
3975        let client = MockClientBuilder::new(None).build().await;
3976
3977        // Wait for the init tasks to die.
3978        sleep(Duration::from_secs(1)).await;
3979
3980        let weak_client = WeakClient::from_client(&client);
3981        assert_eq!(weak_client.strong_count(), 1);
3982
3983        {
3984            let room_id = room_id!("!room:example.org");
3985
3986            // Have the client know the room.
3987            let response = SyncResponseBuilder::default()
3988                .add_joined_room(JoinedRoomBuilder::new(room_id))
3989                .build_sync_response();
3990            client.inner.base_client.receive_sync_response(response).await.unwrap();
3991
3992            client.event_cache().subscribe().unwrap();
3993
3994            let (_room_event_cache, _drop_handles) =
3995                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3996        }
3997
3998        drop(client);
3999
4000        // Give a bit of time for background tasks to die.
4001        sleep(Duration::from_secs(1)).await;
4002
4003        // The weak client must be the last reference to the client now.
4004        assert_eq!(weak_client.strong_count(), 0);
4005        let client = weak_client.get();
4006        assert!(
4007            client.is_none(),
4008            "too many strong references to the client: {}",
4009            Arc::strong_count(&client.unwrap().inner)
4010        );
4011    }
4012
4013    #[async_test]
4014    async fn test_supported_versions_caching() {
4015        let server = MatrixMockServer::new().await;
4016
4017        let versions_mock = server
4018            .mock_versions()
4019            .expect_default_access_token()
4020            .with_feature("org.matrix.e2e_cross_signing", true)
4021            .ok()
4022            .named("first versions mock")
4023            .expect(1)
4024            .mount_as_scoped()
4025            .await;
4026
4027        let memory_store = Arc::new(MemoryStore::new());
4028        let client = server
4029            .client_builder()
4030            .no_server_versions()
4031            .on_builder(|builder| {
4032                builder.store_config(
4033                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4034                        .state_store(memory_store.clone()),
4035                )
4036            })
4037            .build()
4038            .await;
4039
4040        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4041
4042        // The result was cached.
4043        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4044        // This subsequent call hits the in-memory cache.
4045        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4046
4047        drop(client);
4048
4049        let client = server
4050            .client_builder()
4051            .no_server_versions()
4052            .on_builder(|builder| {
4053                builder.store_config(
4054                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4055                        .state_store(memory_store.clone()),
4056                )
4057            })
4058            .build()
4059            .await;
4060
4061        // These calls to the new client hit the on-disk cache.
4062        assert!(
4063            client
4064                .unstable_features()
4065                .await
4066                .unwrap()
4067                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4068        );
4069
4070        let supported = client.supported_versions().await.unwrap();
4071        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4072        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4073
4074        // Then this call hits the in-memory cache.
4075        let supported = client.supported_versions().await.unwrap();
4076        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4077        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4078
4079        drop(versions_mock);
4080
4081        // Now, reset the cache, and observe the endpoint being called again once.
4082        client.reset_supported_versions().await.unwrap();
4083
4084        server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4085
4086        // Hits network again.
4087        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4088        // Hits in-memory cache again.
4089        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4090        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4091
4092        // Force an expiry of the data.
4093        let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4094        let mut ttl_value = TtlValue::new(supported_versions);
4095        ttl_value.expire();
4096        client.inner.caches.supported_versions.set_value(ttl_value);
4097
4098        // Call the method to trigger a cache refresh background task.
4099        client.supported_versions_cached().await.unwrap().unwrap();
4100
4101        // We wait for the task to finish, the endpoint should have been called again.
4102        sleep(Duration::from_secs(1)).await;
4103        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4104    }
4105
4106    #[async_test]
4107    async fn test_well_known_caching() {
4108        let server = MatrixMockServer::new().await;
4109        let server_url = server.uri();
4110        let domain = server_url.strip_prefix("http://").unwrap();
4111        let server_name = <&ServerName>::try_from(domain).unwrap();
4112        let rtc_foci = vec![RtcTransport::livekit("https://livekit.example.com".to_owned())];
4113
4114        let well_known_mock = server
4115            .mock_well_known()
4116            .ok()
4117            .named("well known mock")
4118            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4119            .mount_as_scoped()
4120            .await;
4121
4122        let memory_store = Arc::new(MemoryStore::new());
4123        let client = Client::builder()
4124            .insecure_server_name_no_tls(server_name)
4125            .store_config(
4126                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4127                    .state_store(memory_store.clone()),
4128            )
4129            .build()
4130            .await
4131            .unwrap();
4132
4133        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4134
4135        // This subsequent call hits the in-memory cache.
4136        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4137
4138        drop(client);
4139
4140        let client = server
4141            .client_builder()
4142            .no_server_versions()
4143            .on_builder(|builder| {
4144                builder.store_config(
4145                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4146                        .state_store(memory_store.clone()),
4147                )
4148            })
4149            .build()
4150            .await;
4151
4152        // This call to the new client hits the on-disk cache.
4153        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4154
4155        // Then this call hits the in-memory cache.
4156        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4157
4158        drop(well_known_mock);
4159
4160        // Now, reset the cache, and observe the endpoints being called again once.
4161        client.reset_well_known().await.unwrap();
4162
4163        server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4164
4165        // Hits network again.
4166        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4167        // Hits in-memory cache again.
4168        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4169
4170        // Force an expiry of the data.
4171        let well_known = client.well_known().await;
4172        let mut ttl_value = TtlValue::new(well_known);
4173        ttl_value.expire();
4174        client.inner.caches.well_known.set_value(ttl_value);
4175
4176        // Call the method again to trigger a cache refresh background task.
4177        client.well_known().await;
4178
4179        // We wait for the task to finish, the endpoint should have been called again.
4180        // We need to wait a bit because the first requests using the server name of the
4181        // user will fail, only the requests using the homeserver URL will succeed.
4182        sleep(Duration::from_secs(5)).await;
4183        assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4184    }
4185
4186    #[async_test]
4187    async fn test_missing_well_known_caching() {
4188        let server = MatrixMockServer::new().await;
4189        let rtc_foci: Vec<RtcTransport> = vec![];
4190
4191        let well_known_mock = server
4192            .mock_well_known()
4193            .error_unrecognized()
4194            .named("first well-known mock")
4195            .expect(1)
4196            .mount_as_scoped()
4197            .await;
4198
4199        let memory_store = Arc::new(MemoryStore::new());
4200        let client = server
4201            .client_builder()
4202            .on_builder(|builder| {
4203                builder.store_config(
4204                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4205                        .state_store(memory_store.clone()),
4206                )
4207            })
4208            .build()
4209            .await;
4210
4211        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4212
4213        // This subsequent call hits the in-memory cache.
4214        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4215
4216        drop(client);
4217
4218        let client = server
4219            .client_builder()
4220            .on_builder(|builder| {
4221                builder.store_config(
4222                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4223                        .state_store(memory_store.clone()),
4224                )
4225            })
4226            .build()
4227            .await;
4228
4229        // This call to the new client hits the on-disk cache.
4230        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4231
4232        // Then this call hits the in-memory cache.
4233        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4234
4235        drop(well_known_mock);
4236
4237        // Now, reset the cache, and observe the endpoints being called again once.
4238        client.reset_well_known().await.unwrap();
4239
4240        server
4241            .mock_well_known()
4242            .error_unrecognized()
4243            .expect(1)
4244            .named("second well-known mock")
4245            .mount()
4246            .await;
4247
4248        // Hits network again.
4249        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4250        // Hits in-memory cache again.
4251        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4252    }
4253
4254    #[async_test]
4255    async fn test_no_network_doesnt_cause_infinite_retries() {
4256        // We want infinite retries for transient errors.
4257        let client = MockClientBuilder::new(None)
4258            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4259            .build()
4260            .await;
4261
4262        // We don't define a mock server on purpose here, so that the error is really a
4263        // network error.
4264        client.whoami().await.unwrap_err();
4265    }
4266
4267    #[async_test]
4268    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4269        let server = MatrixMockServer::new().await;
4270        let client = server.client_builder().build().await;
4271
4272        let room_id = room_id!("!room:example.org");
4273
4274        server
4275            .mock_sync()
4276            .ok_and_run(&client, |builder| {
4277                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4278            })
4279            .await;
4280
4281        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4282        assert_eq!(room.room_id(), room_id);
4283    }
4284
4285    #[async_test]
4286    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4287        let server = MatrixMockServer::new().await;
4288        let client = server.client_builder().build().await;
4289
4290        let room_id = room_id!("!room:example.org");
4291
4292        let client = Arc::new(client);
4293
4294        // Perform the /sync request with a delay so it starts after the
4295        // `await_room_remote_echo` call has happened
4296        spawn({
4297            let client = client.clone();
4298            async move {
4299                sleep(Duration::from_millis(100)).await;
4300
4301                server
4302                    .mock_sync()
4303                    .ok_and_run(&client, |builder| {
4304                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4305                    })
4306                    .await;
4307            }
4308        });
4309
4310        let room =
4311            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4312        assert_eq!(room.room_id(), room_id);
4313    }
4314
4315    #[async_test]
4316    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4317        let client = MockClientBuilder::new(None).build().await;
4318
4319        let room_id = room_id!("!room:example.org");
4320        // Room is not present so the client won't be able to find it. The call will
4321        // timeout.
4322        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4323    }
4324
4325    #[async_test]
4326    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4327        let server = MatrixMockServer::new().await;
4328        let client = server.client_builder().build().await;
4329
4330        server.mock_create_room().ok().mount().await;
4331
4332        // Create a room in the internal store
4333        let room = client
4334            .create_room(assign!(CreateRoomRequest::new(), {
4335                invite: vec![],
4336                is_direct: false,
4337            }))
4338            .await
4339            .unwrap();
4340
4341        // Room is locally present, but not synced, the call will timeout
4342        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4343            .await
4344            .unwrap_err();
4345    }
4346
4347    #[async_test]
4348    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4349        let server = MatrixMockServer::new().await;
4350        let client = server.client_builder().build().await;
4351
4352        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4353
4354        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4355        assert_matches!(ret, Ok(true));
4356    }
4357
4358    #[async_test]
4359    async fn test_is_room_alias_available_if_alias_is_resolved() {
4360        let server = MatrixMockServer::new().await;
4361        let client = server.client_builder().build().await;
4362
4363        server
4364            .mock_room_directory_resolve_alias()
4365            .ok("!some_room_id:matrix.org", Vec::new())
4366            .expect(1)
4367            .mount()
4368            .await;
4369
4370        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4371        assert_matches!(ret, Ok(false));
4372    }
4373
4374    #[async_test]
4375    async fn test_is_room_alias_available_if_error_found() {
4376        let server = MatrixMockServer::new().await;
4377        let client = server.client_builder().build().await;
4378
4379        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4380
4381        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4382        assert_matches!(ret, Err(_));
4383    }
4384
4385    #[async_test]
4386    async fn test_create_room_alias() {
4387        let server = MatrixMockServer::new().await;
4388        let client = server.client_builder().build().await;
4389
4390        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4391
4392        let ret = client
4393            .create_room_alias(
4394                room_alias_id!("#some_alias:matrix.org"),
4395                room_id!("!some_room:matrix.org"),
4396            )
4397            .await;
4398        assert_matches!(ret, Ok(()));
4399    }
4400
4401    #[async_test]
4402    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4403        let server = MatrixMockServer::new().await;
4404        let client = server.client_builder().build().await;
4405
4406        let room_id = room_id!("!a-room:matrix.org");
4407
4408        // Make sure the summary endpoint is called once
4409        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4410
4411        // We create a locally cached invited room
4412        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4413
4414        // And we get a preview, the server endpoint was reached
4415        let preview = client
4416            .get_room_preview(room_id.into(), Vec::new())
4417            .await
4418            .expect("Room preview should be retrieved");
4419
4420        assert_eq!(invited_room.room_id(), preview.room_id);
4421    }
4422
4423    #[async_test]
4424    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4425        let server = MatrixMockServer::new().await;
4426        let client = server.client_builder().build().await;
4427
4428        let room_id = room_id!("!a-room:matrix.org");
4429
4430        // Make sure the summary endpoint is called once
4431        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4432
4433        // We create a locally cached left room
4434        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4435
4436        // And we get a preview, the server endpoint was reached
4437        let preview = client
4438            .get_room_preview(room_id.into(), Vec::new())
4439            .await
4440            .expect("Room preview should be retrieved");
4441
4442        assert_eq!(left_room.room_id(), preview.room_id);
4443    }
4444
4445    #[async_test]
4446    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4447        let server = MatrixMockServer::new().await;
4448        let client = server.client_builder().build().await;
4449
4450        let room_id = room_id!("!a-room:matrix.org");
4451
4452        // Make sure the summary endpoint is called once
4453        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4454
4455        // We create a locally cached knocked room
4456        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4457
4458        // And we get a preview, the server endpoint was reached
4459        let preview = client
4460            .get_room_preview(room_id.into(), Vec::new())
4461            .await
4462            .expect("Room preview should be retrieved");
4463
4464        assert_eq!(knocked_room.room_id(), preview.room_id);
4465    }
4466
4467    #[async_test]
4468    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4469        let server = MatrixMockServer::new().await;
4470        let client = server.client_builder().build().await;
4471
4472        let room_id = room_id!("!a-room:matrix.org");
4473
4474        // Make sure the summary endpoint is not called
4475        server.mock_room_summary().ok(room_id).never().mount().await;
4476
4477        // We create a locally cached joined room
4478        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4479
4480        // And we get a preview, no server endpoint was reached
4481        let preview = client
4482            .get_room_preview(room_id.into(), Vec::new())
4483            .await
4484            .expect("Room preview should be retrieved");
4485
4486        assert_eq!(joined_room.room_id(), preview.room_id);
4487    }
4488
4489    #[async_test]
4490    async fn test_media_preview_config() {
4491        let server = MatrixMockServer::new().await;
4492        let client = server.client_builder().build().await;
4493
4494        server
4495            .mock_sync()
4496            .ok_and_run(&client, |builder| {
4497                builder.add_custom_global_account_data(json!({
4498                    "content": {
4499                        "media_previews": "private",
4500                        "invite_avatars": "off"
4501                    },
4502                    "type": "m.media_preview_config"
4503                }));
4504            })
4505            .await;
4506
4507        let (initial_value, stream) =
4508            client.account().observe_media_preview_config().await.unwrap();
4509
4510        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4511        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4512        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4513        pin_mut!(stream);
4514        assert_pending!(stream);
4515
4516        server
4517            .mock_sync()
4518            .ok_and_run(&client, |builder| {
4519                builder.add_custom_global_account_data(json!({
4520                    "content": {
4521                        "media_previews": "off",
4522                        "invite_avatars": "on"
4523                    },
4524                    "type": "m.media_preview_config"
4525                }));
4526            })
4527            .await;
4528
4529        assert_next_matches!(
4530            stream,
4531            MediaPreviewConfigEventContent {
4532                media_previews: Some(MediaPreviews::Off),
4533                invite_avatars: Some(InviteAvatars::On),
4534                ..
4535            }
4536        );
4537        assert_pending!(stream);
4538    }
4539
4540    #[async_test]
4541    async fn test_unstable_media_preview_config() {
4542        let server = MatrixMockServer::new().await;
4543        let client = server.client_builder().build().await;
4544
4545        server
4546            .mock_sync()
4547            .ok_and_run(&client, |builder| {
4548                builder.add_custom_global_account_data(json!({
4549                    "content": {
4550                        "media_previews": "private",
4551                        "invite_avatars": "off"
4552                    },
4553                    "type": "io.element.msc4278.media_preview_config"
4554                }));
4555            })
4556            .await;
4557
4558        let (initial_value, stream) =
4559            client.account().observe_media_preview_config().await.unwrap();
4560
4561        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4562        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4563        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4564        pin_mut!(stream);
4565        assert_pending!(stream);
4566
4567        server
4568            .mock_sync()
4569            .ok_and_run(&client, |builder| {
4570                builder.add_custom_global_account_data(json!({
4571                    "content": {
4572                        "media_previews": "off",
4573                        "invite_avatars": "on"
4574                    },
4575                    "type": "io.element.msc4278.media_preview_config"
4576                }));
4577            })
4578            .await;
4579
4580        assert_next_matches!(
4581            stream,
4582            MediaPreviewConfigEventContent {
4583                media_previews: Some(MediaPreviews::Off),
4584                invite_avatars: Some(InviteAvatars::On),
4585                ..
4586            }
4587        );
4588        assert_pending!(stream);
4589    }
4590
4591    #[async_test]
4592    async fn test_media_preview_config_not_found() {
4593        let server = MatrixMockServer::new().await;
4594        let client = server.client_builder().build().await;
4595
4596        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4597
4598        assert!(initial_value.is_none());
4599    }
4600
4601    #[async_test]
4602    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4603        // The default Matrix version we use is 1.11 or higher, so authenticated media
4604        // is supported.
4605        let server = MatrixMockServer::new().await;
4606        let client = server.client_builder().build().await;
4607
4608        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4609
4610        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4611        client.load_or_fetch_max_upload_size().await.unwrap();
4612
4613        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4614    }
4615
4616    #[async_test]
4617    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4618        // The server must advertise support for the stable feature for authenticated
4619        // media support, so we mock the `GET /versions` response.
4620        let server = MatrixMockServer::new().await;
4621        let client = server.client_builder().no_server_versions().build().await;
4622
4623        server
4624            .mock_versions()
4625            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4626            .with_feature("org.matrix.msc3916.stable", true)
4627            .ok()
4628            .named("versions")
4629            .expect(1)
4630            .mount()
4631            .await;
4632
4633        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4634
4635        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4636        client.load_or_fetch_max_upload_size().await.unwrap();
4637
4638        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4639    }
4640
4641    #[async_test]
4642    async fn test_load_or_fetch_max_upload_size_no_auth() {
4643        // The server must not support Matrix 1.11 or higher for unauthenticated
4644        // media requests, so we mock the `GET /versions` response.
4645        let server = MatrixMockServer::new().await;
4646        let client = server.client_builder().no_server_versions().build().await;
4647
4648        server
4649            .mock_versions()
4650            .with_versions(vec!["v1.1"])
4651            .ok()
4652            .named("versions")
4653            .expect(1)
4654            .mount()
4655            .await;
4656
4657        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4658
4659        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4660        client.load_or_fetch_max_upload_size().await.unwrap();
4661
4662        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4663    }
4664
4665    #[async_test]
4666    async fn test_uploading_a_too_large_media_file() {
4667        let server = MatrixMockServer::new().await;
4668        let client = server.client_builder().build().await;
4669
4670        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4671        client.load_or_fetch_max_upload_size().await.unwrap();
4672        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4673
4674        let data = vec![1, 2];
4675        let upload_request =
4676            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4677        let request = SendRequest {
4678            client: client.clone(),
4679            request: upload_request,
4680            config: None,
4681            send_progress: SharedObservable::new(TransmissionProgress::default()),
4682        };
4683        let media_request = SendMediaUploadRequest::new(request);
4684
4685        let error = media_request.await.err();
4686        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4687        assert_eq!(max, uint!(1));
4688        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4689    }
4690
4691    #[async_test]
4692    async fn test_dont_ignore_timeout_on_first_sync() {
4693        let server = MatrixMockServer::new().await;
4694        let client = server.client_builder().build().await;
4695
4696        server
4697            .mock_sync()
4698            .timeout(Some(Duration::from_secs(30)))
4699            .ok(|_| {})
4700            .mock_once()
4701            .named("sync_with_timeout")
4702            .mount()
4703            .await;
4704
4705        // Call the endpoint once to check the timeout.
4706        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4707
4708        timeout(Duration::from_secs(1), async {
4709            stream.next().await.unwrap().unwrap();
4710        })
4711        .await
4712        .unwrap();
4713    }
4714
4715    #[async_test]
4716    async fn test_ignore_timeout_on_first_sync() {
4717        let server = MatrixMockServer::new().await;
4718        let client = server.client_builder().build().await;
4719
4720        server
4721            .mock_sync()
4722            .timeout(None)
4723            .ok(|_| {})
4724            .mock_once()
4725            .named("sync_no_timeout")
4726            .mount()
4727            .await;
4728        server
4729            .mock_sync()
4730            .timeout(Some(Duration::from_secs(30)))
4731            .ok(|_| {})
4732            .mock_once()
4733            .named("sync_with_timeout")
4734            .mount()
4735            .await;
4736
4737        // Call each version of the endpoint once to check the timeouts.
4738        let mut stream = Box::pin(
4739            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4740        );
4741
4742        timeout(Duration::from_secs(1), async {
4743            stream.next().await.unwrap().unwrap();
4744            stream.next().await.unwrap().unwrap();
4745        })
4746        .await
4747        .unwrap();
4748    }
4749
4750    #[async_test]
4751    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4752        let server = MatrixMockServer::new().await;
4753        let client = server.client_builder().build().await;
4754        // This is the user ID that is inside MemberAdditional.
4755        // Note the confusing username, so we can share
4756        // GlobalAccountDataTestEvent::Direct with the invited test.
4757        let user_id = user_id!("@invited:localhost");
4758
4759        // When we receive a sync response saying "invited" is invited to a DM
4760        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4761        let response = SyncResponseBuilder::default()
4762            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4763            .add_global_account_data(
4764                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4765            )
4766            .build_sync_response();
4767        client.base_client().receive_sync_response(response).await.unwrap();
4768
4769        // Then get_dm_room finds this room
4770        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4771        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4772    }
4773
4774    #[async_test]
4775    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4776        let server = MatrixMockServer::new().await;
4777        let client = server.client_builder().build().await;
4778        // This is the user ID that is inside MemberInvite
4779        let user_id = user_id!("@invited:localhost");
4780
4781        // When we receive a sync response saying "invited" is invited to a DM
4782        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4783        let response = SyncResponseBuilder::default()
4784            .add_joined_room(
4785                JoinedRoomBuilder::default()
4786                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4787            )
4788            .add_global_account_data(
4789                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4790            )
4791            .build_sync_response();
4792        client.base_client().receive_sync_response(response).await.unwrap();
4793
4794        // Then get_dm_room finds this room
4795        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4796        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4797    }
4798
4799    #[async_test]
4800    async fn test_get_dm_room_still_finds_left_room() {
4801        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4802        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4803
4804        let server = MatrixMockServer::new().await;
4805        let client = server.client_builder().build().await;
4806        // This is the user ID that is inside MemberAdditional.
4807        // Note the confusing username, so we can share
4808        // GlobalAccountDataTestEvent::Direct with the invited test.
4809        let user_id = user_id!("@invited:localhost");
4810
4811        // When we receive a sync response saying "invited" has left a DM
4812        let f = EventFactory::new().sender(user_id);
4813        let response = SyncResponseBuilder::default()
4814            .add_joined_room(
4815                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4816            )
4817            .add_global_account_data(
4818                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4819            )
4820            .build_sync_response();
4821        client.base_client().receive_sync_response(response).await.unwrap();
4822
4823        // Then get_dm_room finds this room
4824        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4825        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4826    }
4827
4828    #[async_test]
4829    async fn test_device_exists() {
4830        let server = MatrixMockServer::new().await;
4831        let client = server.client_builder().build().await;
4832
4833        server.mock_get_device().ok().expect(1).mount().await;
4834
4835        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4836    }
4837
4838    #[async_test]
4839    async fn test_device_exists_404() {
4840        let server = MatrixMockServer::new().await;
4841        let client = server.client_builder().build().await;
4842
4843        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4844    }
4845
4846    #[async_test]
4847    async fn test_device_exists_500() {
4848        let server = MatrixMockServer::new().await;
4849        let client = server.client_builder().build().await;
4850
4851        server.mock_get_device().error500().expect(1).mount().await;
4852
4853        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4854    }
4855
4856    #[async_test]
4857    async fn test_fetching_well_known_with_homeserver_url() {
4858        let server = MatrixMockServer::new().await;
4859        let client = server.client_builder().build().await;
4860        server.mock_well_known().ok().mount().await;
4861
4862        assert_matches!(client.fetch_client_well_known().await, Some(_));
4863    }
4864
4865    #[async_test]
4866    async fn test_fetching_well_known_with_server_name() {
4867        let server = MatrixMockServer::new().await;
4868        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4869
4870        server.mock_well_known().ok().mount().await;
4871
4872        let client = MockClientBuilder::new(None)
4873            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4874            .build()
4875            .await;
4876
4877        assert_matches!(client.fetch_client_well_known().await, Some(_));
4878    }
4879
4880    #[async_test]
4881    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4882        let server = MatrixMockServer::new().await;
4883        server.mock_well_known().ok().mount().await;
4884
4885        let user_id =
4886            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4887        let client = MockClientBuilder::new(None)
4888            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4889            .build()
4890            .await;
4891
4892        assert_matches!(client.fetch_client_well_known().await, Some(_));
4893    }
4894}