Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36    SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37    SyncOutsideWasm, ThreadingSupport,
38    event_cache::store::EventCacheStoreLock,
39    media::store::MediaStoreLock,
40    store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{
59                discover_homeserver::{self, RtcFocusInfo},
60                get_supported_versions,
61            },
62            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63            knock::knock_room,
64            media,
65            membership::{join_room_by_id, join_room_by_id_or_alias},
66            room::create_room,
67            session::login::v3::DiscoveryInfo,
68            sync::sync_events,
69            threads::get_thread_subscriptions_changes,
70            uiaa,
71            user_directory::search_users,
72        },
73        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74        path_builder::PathBuilder,
75    },
76    assign,
77    events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78    push::Ruleset,
79    time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87    caches::{Cache, CachedValue, ClientCaches},
88    futures::SendRequest,
89};
90use crate::{
91    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92    Room, SessionTokens, TransmissionProgress,
93    authentication::{
94        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95        oauth::OAuth,
96    },
97    client::{
98        homeserver_capabilities::HomeserverCapabilities,
99        thread_subscriptions::ThreadSubscriptionCatchup,
100    },
101    config::{RequestConfig, SyncToken},
102    deduplicating_handler::DeduplicatingHandler,
103    error::HttpResult,
104    event_cache::EventCache,
105    event_handler::{
106        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107        EventHandlerStore, ObservableEventHandler, SyncEvent,
108    },
109    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110    latest_events::LatestEvents,
111    live_locations_observer::BeaconInfoUpdate,
112    media::MediaError,
113    notification_settings::NotificationSettings,
114    room::RoomMember,
115    room_preview::RoomPreview,
116    send_queue::{SendQueue, SendQueueData},
117    sliding_sync::Version as SlidingSyncVersion,
118    sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122    cross_process_lock::CrossProcessLock,
123    encryption::{
124        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125        VerificationState,
126    },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158    /// Continue running the loop.
159    Continue,
160    /// Break out of the loop.
161    Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167    /// The session's token is no longer valid.
168    UnknownToken(UnknownTokenErrorData),
169    /// The session's tokens have been refreshed.
170    TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177    /// The server name.
178    pub server_name: String,
179    /// The server version.
180    pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188    pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193    /// Lock ensuring that only a single room may be marked as a DM at once.
194    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195    /// explanation.
196    pub(crate) mark_as_dm_lock: Mutex<()>,
197
198    /// Lock ensuring that only a single secret store is getting opened at the
199    /// same time.
200    ///
201    /// This is important so we don't accidentally create multiple different new
202    /// default secret storage keys.
203    #[cfg(feature = "e2e-encryption")]
204    pub(crate) open_secret_store_lock: Mutex<()>,
205
206    /// Lock ensuring that we're only storing a single secret at a time.
207    ///
208    /// Take a look at the [`SecretStore::put_secret`] method for a more
209    /// detailed explanation.
210    ///
211    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212    #[cfg(feature = "e2e-encryption")]
213    pub(crate) store_secret_lock: Mutex<()>,
214
215    /// Lock ensuring that only one method at a time might modify our backup.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) backup_modify_lock: Mutex<()>,
218
219    /// Lock ensuring that we're going to attempt to upload backups for a single
220    /// requester.
221    #[cfg(feature = "e2e-encryption")]
222    pub(crate) backup_upload_lock: Mutex<()>,
223
224    /// Handler making sure we only have one group session sharing request in
225    /// flight per room.
226    #[cfg(feature = "e2e-encryption")]
227    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229    /// Lock making sure we're only doing one key claim request at a time.
230    #[cfg(feature = "e2e-encryption")]
231    pub(crate) key_claim_lock: Mutex<()>,
232
233    /// Handler to ensure that only one members request is running at a time,
234    /// given a room.
235    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237    /// Handler to ensure that only one encryption state request is running at a
238    /// time, given a room.
239    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241    /// Deduplicating handler for sending read receipts. The string is an
242    /// internal implementation detail, see [`Self::send_single_receipt`].
243    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245    #[cfg(feature = "e2e-encryption")]
246    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248    /// Latest "generation" of data known by the crypto store.
249    ///
250    /// This is a counter that only increments, set in the database (and can
251    /// wrap). It's incremented whenever some process acquires a lock for the
252    /// first time. *This assumes the crypto store lock is being held, to
253    /// avoid data races on writing to this value in the store*.
254    ///
255    /// The current process will maintain this value in local memory and in the
256    /// DB over time. Observing a different value than the one read in
257    /// memory, when reading from the store indicates that somebody else has
258    /// written into the database under our feet.
259    ///
260    /// TODO: this should live in the `OlmMachine`, since it's information
261    /// related to the lock. As of today (2023-07-28), we blow up the entire
262    /// olm machine when there's a generation mismatch. So storing the
263    /// generation in the olm machine would make the client think there's
264    /// *always* a mismatch, and that's why we need to store the generation
265    /// outside the `OlmMachine`.
266    #[cfg(feature = "e2e-encryption")]
267    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271    /// All the data related to authentication and authorization.
272    pub(crate) auth_ctx: Arc<AuthCtx>,
273
274    /// The URL of the server.
275    ///
276    /// Not to be confused with the `Self::homeserver`. `server` is usually
277    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279    /// homeserver (at the time of writing — 2024-08-28).
280    ///
281    /// This value is optional depending on how the `Client` has been built.
282    /// If it's been built from a homeserver URL directly, we don't know the
283    /// server. However, if the `Client` has been built from a server URL or
284    /// name, then the homeserver has been discovered, and we know both.
285    server: Option<Url>,
286
287    /// The URL of the homeserver to connect to.
288    ///
289    /// This is the URL for the client-server Matrix API.
290    homeserver: StdRwLock<Url>,
291
292    /// The sliding sync version.
293    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295    /// The underlying HTTP client.
296    pub(crate) http_client: HttpClient,
297
298    /// User session data.
299    pub(super) base_client: BaseClient,
300
301    /// Collection of in-memory caches for the [`Client`].
302    pub(crate) caches: ClientCaches,
303
304    /// Collection of locks individual client methods might want to use, either
305    /// to ensure that only a single call to a method happens at once or to
306    /// deduplicate multiple calls to a method.
307    pub(crate) locks: ClientLocks,
308
309    /// The cross-process lock configuration.
310    ///
311    /// The SDK provides cross-process store locks (see
312    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313    /// [`CrossProcessLockConfig::MultiProcess`] is used.
314    ///
315    /// If multiple `Client`s are running in different processes, this
316    /// value MUST be different for each `Client`.
317    cross_process_lock_config: CrossProcessLockConfig,
318
319    /// A mapping of the times at which the current user sent typing notices,
320    /// keyed by room.
321    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323    /// Event handlers. See `add_event_handler`.
324    pub(crate) event_handlers: EventHandlerStore,
325
326    /// Notification handlers. See `register_notification_handler`.
327    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329    /// The sender-side of channels used to receive room updates.
330    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332    /// The sender-side of a channel used to observe all the room updates of a
333    /// sync response.
334    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336    /// Whether the client should update its homeserver URL with the discovery
337    /// information present in the login response.
338    respect_login_well_known: bool,
339
340    /// An event that can be listened on to wait for a successful sync. The
341    /// event will only be fired if a sync loop is running. Can be used for
342    /// synchronization, e.g. if we send out a request to create a room, we can
343    /// wait for the sync to get the data to fetch a room object from the state
344    /// store.
345    pub(crate) sync_beat: event_listener::Event,
346
347    /// A central cache for events, inactive first.
348    ///
349    /// It becomes active when [`EventCache::subscribe`] is called.
350    pub(crate) event_cache: OnceCell<EventCache>,
351
352    /// End-to-end encryption related state.
353    #[cfg(feature = "e2e-encryption")]
354    pub(crate) e2ee: EncryptionData,
355
356    /// The verification state of our own device.
357    #[cfg(feature = "e2e-encryption")]
358    pub(crate) verification_state: SharedObservable<VerificationState>,
359
360    /// Whether to enable the experimental support for sending and receiving
361    /// encrypted room history on invite, per [MSC4268].
362    ///
363    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364    #[cfg(feature = "e2e-encryption")]
365    pub(crate) enable_share_history_on_invite: bool,
366
367    /// Data related to the [`SendQueue`].
368    ///
369    /// [`SendQueue`]: crate::send_queue::SendQueue
370    pub(crate) send_queue_data: Arc<SendQueueData>,
371
372    /// The `max_upload_size` value of the homeserver, it contains the max
373    /// request size you can send.
374    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376    /// The entry point to get the [`LatestEvent`] of rooms and threads.
377    ///
378    /// [`LatestEvent`]: crate::latest_event::LatestEvent
379    latest_events: OnceCell<LatestEvents>,
380
381    /// Service handling the catching up of thread subscriptions in the
382    /// background.
383    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385    #[cfg(feature = "experimental-search")]
386    /// Handler for [`RoomIndex`]'s of each room
387    search_index: SearchIndex,
388
389    /// A monitor for background tasks spawned by the client.
390    pub(crate) task_monitor: TaskMonitor,
391
392    /// A sender to notify subscribers about duplicate key upload errors
393    /// triggered by requests to /keys/upload.
394    #[cfg(feature = "e2e-encryption")]
395    pub(crate) duplicate_key_upload_error_sender:
396        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400    /// Create a new `ClientInner`.
401    ///
402    /// All the fields passed as parameters here are those that must be cloned
403    /// upon instantiation of a sub-client, e.g. a client specialized for
404    /// notifications.
405    #[allow(clippy::too_many_arguments)]
406    async fn new(
407        auth_ctx: Arc<AuthCtx>,
408        server: Option<Url>,
409        homeserver: Url,
410        sliding_sync_version: SlidingSyncVersion,
411        http_client: HttpClient,
412        base_client: BaseClient,
413        supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414        well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415        respect_login_well_known: bool,
416        event_cache: OnceCell<EventCache>,
417        send_queue: Arc<SendQueueData>,
418        latest_events: OnceCell<LatestEvents>,
419        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421        cross_process_lock_config: CrossProcessLockConfig,
422        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424    ) -> Arc<Self> {
425        let caches = ClientCaches {
426            supported_versions: Cache::with_value(supported_versions),
427            well_known: Cache::with_value(well_known),
428            server_metadata: Cache::new(),
429            homeserver_capabilities: Cache::new(),
430        };
431
432        let client = Self {
433            server,
434            homeserver: StdRwLock::new(homeserver),
435            auth_ctx,
436            sliding_sync_version: StdRwLock::new(sliding_sync_version),
437            http_client,
438            base_client,
439            caches,
440            locks: Default::default(),
441            cross_process_lock_config,
442            typing_notice_times: Default::default(),
443            event_handlers: Default::default(),
444            notification_handlers: Default::default(),
445            room_update_channels: Default::default(),
446            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447            // ballast for all observers to catch up.
448            room_updates_sender: broadcast::Sender::new(32),
449            respect_login_well_known,
450            sync_beat: event_listener::Event::new(),
451            event_cache,
452            send_queue_data: send_queue,
453            latest_events,
454            #[cfg(feature = "e2e-encryption")]
455            e2ee: EncryptionData::new(encryption_settings),
456            #[cfg(feature = "e2e-encryption")]
457            verification_state: SharedObservable::new(VerificationState::Unknown),
458            #[cfg(feature = "e2e-encryption")]
459            enable_share_history_on_invite,
460            server_max_upload_size: Mutex::new(OnceCell::new()),
461            #[cfg(feature = "experimental-search")]
462            search_index: search_index_handler,
463            thread_subscription_catchup,
464            task_monitor: TaskMonitor::new(),
465            #[cfg(feature = "e2e-encryption")]
466            duplicate_key_upload_error_sender: broadcast::channel(1).0,
467        };
468
469        #[allow(clippy::let_and_return)]
470        let client = Arc::new(client);
471
472        #[cfg(feature = "e2e-encryption")]
473        client.e2ee.initialize_tasks(&client);
474
475        let init_event_cache = client.event_cache.get_or_init(|| async {
476            EventCache::new(&client, client.base_client.event_cache_store().clone())
477        });
478
479        let init_thread_subscription_catchup =
480            client.thread_subscription_catchup.get_or_init(|| async {
481                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
482            });
483
484        let _ = join!(init_event_cache, init_thread_subscription_catchup);
485
486        client
487    }
488}
489
490#[cfg(not(tarpaulin_include))]
491impl Debug for Client {
492    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
493        write!(fmt, "Client")
494    }
495}
496
497impl Client {
498    /// Create a new [`Client`] that will use the given homeserver.
499    ///
500    /// # Arguments
501    ///
502    /// * `homeserver_url` - The homeserver that the client should connect to.
503    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
504        Self::builder().homeserver_url(homeserver_url).build().await
505    }
506
507    /// Returns a subscriber that publishes an event every time the ignore user
508    /// list changes.
509    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
510        self.inner.base_client.subscribe_to_ignore_user_list_changes()
511    }
512
513    /// Create a new [`ClientBuilder`].
514    pub fn builder() -> ClientBuilder {
515        ClientBuilder::new()
516    }
517
518    pub(crate) fn base_client(&self) -> &BaseClient {
519        &self.inner.base_client
520    }
521
522    /// The underlying HTTP client.
523    pub fn http_client(&self) -> &reqwest::Client {
524        &self.inner.http_client.inner
525    }
526
527    pub(crate) fn locks(&self) -> &ClientLocks {
528        &self.inner.locks
529    }
530
531    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
532        &self.inner.auth_ctx
533    }
534
535    /// The cross-process store lock configuration used by this [`Client`].
536    ///
537    /// The SDK provides cross-process store locks (see
538    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
539    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
540    /// the value used for all cross-process store locks used by this
541    /// `Client`.
542    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
543        &self.inner.cross_process_lock_config
544    }
545
546    /// Change the homeserver URL used by this client.
547    ///
548    /// # Arguments
549    ///
550    /// * `homeserver_url` - The new URL to use.
551    fn set_homeserver(&self, homeserver_url: Url) {
552        *self.inner.homeserver.write().unwrap() = homeserver_url;
553    }
554
555    /// Change to a different homeserver and re-resolve well-known.
556    #[cfg(feature = "e2e-encryption")]
557    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
558        &self,
559        homeserver_url: Url,
560    ) -> Result<()> {
561        self.set_homeserver(homeserver_url);
562        self.reset_well_known().await?;
563        if let Some(well_known) = self.well_known().await {
564            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
565        }
566        Ok(())
567    }
568
569    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
570    /// supported or disabled by the homeserver.
571    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
572        HomeserverCapabilities::new(self.clone())
573    }
574
575    /// Get the server vendor information from the federation API.
576    ///
577    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
578    /// both the server's software name and version.
579    ///
580    /// # Examples
581    ///
582    /// ```no_run
583    /// # use matrix_sdk::Client;
584    /// # use url::Url;
585    /// # async {
586    /// # let homeserver = Url::parse("http://example.com")?;
587    /// let client = Client::new(homeserver).await?;
588    ///
589    /// let server_info = client.server_vendor_info(None).await?;
590    /// println!(
591    ///     "Server: {}, Version: {}",
592    ///     server_info.server_name, server_info.version
593    /// );
594    /// # anyhow::Ok(()) };
595    /// ```
596    #[cfg(feature = "federation-api")]
597    pub async fn server_vendor_info(
598        &self,
599        request_config: Option<RequestConfig>,
600    ) -> HttpResult<ServerVendorInfo> {
601        use ruma::api::federation::discovery::get_server_version;
602
603        let res = self
604            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
605            .await?;
606
607        // Extract server info, using defaults if fields are missing.
608        let server = res.server.unwrap_or_default();
609        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
610        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
611
612        Ok(ServerVendorInfo { server_name: server_name_str, version })
613    }
614
615    /// Get a copy of the default request config.
616    ///
617    /// The default request config is what's used when sending requests if no
618    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
619    /// function with such a parameter.
620    ///
621    /// If the default request config was not customized through
622    /// [`ClientBuilder`] when creating this `Client`, the returned value will
623    /// be equivalent to [`RequestConfig::default()`].
624    pub fn request_config(&self) -> RequestConfig {
625        self.inner.http_client.request_config
626    }
627
628    /// Check whether the client has been activated.
629    ///
630    /// A client is considered active when:
631    ///
632    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
633    ///    is logged in,
634    /// 2. Has loaded cached data from storage,
635    /// 3. If encryption is enabled, it also initialized or restored its
636    ///    `OlmMachine`.
637    pub fn is_active(&self) -> bool {
638        self.inner.base_client.is_active()
639    }
640
641    /// The server used by the client.
642    ///
643    /// See `Self::server` to learn more.
644    pub fn server(&self) -> Option<&Url> {
645        self.inner.server.as_ref()
646    }
647
648    /// The homeserver of the client.
649    pub fn homeserver(&self) -> Url {
650        self.inner.homeserver.read().unwrap().clone()
651    }
652
653    /// Get the sliding sync version.
654    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
655        self.inner.sliding_sync_version.read().unwrap().clone()
656    }
657
658    /// Override the sliding sync version.
659    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
660        let mut lock = self.inner.sliding_sync_version.write().unwrap();
661        *lock = version;
662    }
663
664    /// Get the Matrix user session meta information.
665    ///
666    /// If the client is currently logged in, this will return a
667    /// [`SessionMeta`] object which contains the user ID and device ID.
668    /// Otherwise it returns `None`.
669    pub fn session_meta(&self) -> Option<&SessionMeta> {
670        self.base_client().session_meta()
671    }
672
673    /// Returns a receiver that gets events for each room info update. To watch
674    /// for new events, use `receiver.resubscribe()`.
675    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
676        self.base_client().room_info_notable_update_receiver()
677    }
678
679    /// Performs a search for users.
680    /// The search is performed case-insensitively on user IDs and display names
681    ///
682    /// # Arguments
683    ///
684    /// * `search_term` - The search term for the search
685    /// * `limit` - The maximum number of results to return. Defaults to 10.
686    ///
687    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
688    pub async fn search_users(
689        &self,
690        search_term: &str,
691        limit: u64,
692    ) -> HttpResult<search_users::v3::Response> {
693        let mut request = search_users::v3::Request::new(search_term.to_owned());
694
695        if let Some(limit) = UInt::new(limit) {
696            request.limit = limit;
697        }
698
699        self.send(request).await
700    }
701
702    /// Get the user id of the current owner of the client.
703    pub fn user_id(&self) -> Option<&UserId> {
704        self.session_meta().map(|s| s.user_id.as_ref())
705    }
706
707    /// Get the device ID that identifies the current session.
708    pub fn device_id(&self) -> Option<&DeviceId> {
709        self.session_meta().map(|s| s.device_id.as_ref())
710    }
711
712    /// Get the current access token for this session.
713    ///
714    /// Will be `None` if the client has not been logged in.
715    pub fn access_token(&self) -> Option<String> {
716        self.auth_ctx().access_token()
717    }
718
719    /// Get the current tokens for this session.
720    ///
721    /// To be notified of changes in the session tokens, use
722    /// [`Client::subscribe_to_session_changes()`] or
723    /// [`Client::set_session_callbacks()`].
724    ///
725    /// Returns `None` if the client has not been logged in.
726    pub fn session_tokens(&self) -> Option<SessionTokens> {
727        self.auth_ctx().session_tokens()
728    }
729
730    /// Access the authentication API used to log in this client.
731    ///
732    /// Will be `None` if the client has not been logged in.
733    pub fn auth_api(&self) -> Option<AuthApi> {
734        match self.auth_ctx().auth_data.get()? {
735            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
736            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
737        }
738    }
739
740    /// Get the whole session info of this client.
741    ///
742    /// Will be `None` if the client has not been logged in.
743    ///
744    /// Can be used with [`Client::restore_session`] to restore a previously
745    /// logged-in session.
746    pub fn session(&self) -> Option<AuthSession> {
747        match self.auth_api()? {
748            AuthApi::Matrix(api) => api.session().map(Into::into),
749            AuthApi::OAuth(api) => api.full_session().map(Into::into),
750        }
751    }
752
753    /// Get a reference to the state store.
754    pub fn state_store(&self) -> &DynStateStore {
755        self.base_client().state_store()
756    }
757
758    /// Get a reference to the event cache store.
759    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
760        self.base_client().event_cache_store()
761    }
762
763    /// Get a reference to the media store.
764    pub fn media_store(&self) -> &MediaStoreLock {
765        self.base_client().media_store()
766    }
767
768    /// Access the native Matrix authentication API with this client.
769    pub fn matrix_auth(&self) -> MatrixAuth {
770        MatrixAuth::new(self.clone())
771    }
772
773    /// Get the account of the current owner of the client.
774    pub fn account(&self) -> Account {
775        Account::new(self.clone())
776    }
777
778    /// Get the encryption manager of the client.
779    #[cfg(feature = "e2e-encryption")]
780    pub fn encryption(&self) -> Encryption {
781        Encryption::new(self.clone())
782    }
783
784    /// Get the media manager of the client.
785    pub fn media(&self) -> Media {
786        Media::new(self.clone())
787    }
788
789    /// Get the pusher manager of the client.
790    pub fn pusher(&self) -> Pusher {
791        Pusher::new(self.clone())
792    }
793
794    /// Access the OAuth 2.0 API of the client.
795    pub fn oauth(&self) -> OAuth {
796        OAuth::new(self.clone())
797    }
798
799    /// Register a handler for a specific event type.
800    ///
801    /// The handler is a function or closure with one or more arguments. The
802    /// first argument is the event itself. All additional arguments are
803    /// "context" arguments: They have to implement [`EventHandlerContext`].
804    /// This trait is named that way because most of the types implementing it
805    /// give additional context about an event: The room it was in, its raw form
806    /// and other similar things. As two exceptions to this,
807    /// [`Client`] and [`EventHandlerHandle`] also implement the
808    /// `EventHandlerContext` trait so you don't have to clone your client
809    /// into the event handler manually and a handler can decide to remove
810    /// itself.
811    ///
812    /// Some context arguments are not universally applicable. A context
813    /// argument that isn't available for the given event type will result in
814    /// the event handler being skipped and an error being logged. The following
815    /// context argument types are only available for a subset of event types:
816    ///
817    /// * [`Room`] is only available for room-specific events, i.e. not for
818    ///   events like global account data events or presence events.
819    ///
820    /// You can provide custom context via
821    /// [`add_event_handler_context`](Client::add_event_handler_context) and
822    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
823    /// into the event handler.
824    ///
825    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
826    ///
827    /// # Examples
828    ///
829    /// ```no_run
830    /// use matrix_sdk::{
831    ///     deserialized_responses::EncryptionInfo,
832    ///     event_handler::Ctx,
833    ///     ruma::{
834    ///         events::{
835    ///             macros::EventContent,
836    ///             push_rules::PushRulesEvent,
837    ///             room::{
838    ///                 message::SyncRoomMessageEvent,
839    ///                 topic::SyncRoomTopicEvent,
840    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
841    ///             },
842    ///         },
843    ///         push::Action,
844    ///         Int, MilliSecondsSinceUnixEpoch,
845    ///     },
846    ///     Client, Room,
847    /// };
848    /// use serde::{Deserialize, Serialize};
849    ///
850    /// # async fn example(client: Client) {
851    /// client.add_event_handler(
852    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
853    ///         // Common usage: Room event plus room and client.
854    ///     },
855    /// );
856    /// client.add_event_handler(
857    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
858    ///         async move {
859    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
860    ///             // unencrypted events and events that were decrypted by the SDK.
861    ///         }
862    ///     },
863    /// );
864    /// client.add_event_handler(
865    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
866    ///         async move {
867    ///             // A `Vec<Action>` parameter allows you to know which push actions
868    ///             // are applicable for an event. For example, an event with
869    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
870    ///             // should be highlighted in the timeline.
871    ///         }
872    ///     },
873    /// );
874    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
875    ///     // You can omit any or all arguments after the first.
876    /// });
877    ///
878    /// // Registering a temporary event handler:
879    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
880    ///     /* Event handler */
881    /// });
882    /// client.remove_event_handler(handle);
883    ///
884    /// // Registering custom event handler context:
885    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
886    /// struct MyContext {
887    ///     number: usize,
888    /// }
889    /// client.add_event_handler_context(MyContext { number: 5 });
890    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
891    ///     // Use the context
892    /// });
893    ///
894    /// // This will handle membership events in joined rooms. Invites are special, see below.
895    /// client.add_event_handler(
896    ///     |ev: SyncRoomMemberEvent| async move {},
897    /// );
898    ///
899    /// // To handle state events in invited rooms (including invite membership events),
900    /// // `StrippedRoomMemberEvent` should be used.
901    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
902    /// client.add_event_handler(
903    ///     |ev: StrippedRoomMemberEvent| async move {},
904    /// );
905    ///
906    /// // Custom events work exactly the same way, you just need to declare
907    /// // the content struct and use the EventContent derive macro on it.
908    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
909    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
910    /// struct TokenEventContent {
911    ///     token: String,
912    ///     #[serde(rename = "exp")]
913    ///     expires_at: MilliSecondsSinceUnixEpoch,
914    /// }
915    ///
916    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
917    ///     todo!("Display the token");
918    /// });
919    ///
920    /// // Event handler closures can also capture local variables.
921    /// // Make sure they are cheap to clone though, because they will be cloned
922    /// // every time the closure is called.
923    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
924    ///
925    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
926    ///     println!("Calling the handler with identifier {data}");
927    /// });
928    /// # }
929    /// ```
930    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
931    where
932        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
933        H: EventHandler<Ev, Ctx>,
934    {
935        self.add_event_handler_impl(handler, None)
936    }
937
938    /// Register a handler for a specific room, and event type.
939    ///
940    /// This method works the same way as
941    /// [`add_event_handler`][Self::add_event_handler], except that the handler
942    /// will only be called for events in the room with the specified ID. See
943    /// that method for more details on event handler functions.
944    ///
945    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
946    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
947    /// your use case.
948    pub fn add_room_event_handler<Ev, Ctx, H>(
949        &self,
950        room_id: &RoomId,
951        handler: H,
952    ) -> EventHandlerHandle
953    where
954        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
955        H: EventHandler<Ev, Ctx>,
956    {
957        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
958    }
959
960    /// Observe a specific event type.
961    ///
962    /// `Ev` represents the kind of event that will be observed. `Ctx`
963    /// represents the context that will come with the event. It relies on the
964    /// same mechanism as [`Client::add_event_handler`]. The main difference is
965    /// that it returns an [`ObservableEventHandler`] and doesn't require a
966    /// user-defined closure. It is possible to subscribe to the
967    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
968    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
969    /// Ctx)`.
970    ///
971    /// Be careful that only the most recent value can be observed. Subscribers
972    /// are notified when a new value is sent, but there is no guarantee
973    /// that they will see all values.
974    ///
975    /// # Example
976    ///
977    /// Let's see a classical usage:
978    ///
979    /// ```
980    /// use futures_util::StreamExt as _;
981    /// use matrix_sdk::{
982    ///     Client, Room,
983    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
984    /// };
985    ///
986    /// # async fn example(client: Client) -> Option<()> {
987    /// let observer =
988    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
989    ///
990    /// let mut subscriber = observer.subscribe();
991    ///
992    /// let (event, (room, push_actions)) = subscriber.next().await?;
993    /// # Some(())
994    /// # }
995    /// ```
996    ///
997    /// Now let's see how to get several contexts that can be useful for you:
998    ///
999    /// ```
1000    /// use matrix_sdk::{
1001    ///     Client, Room,
1002    ///     deserialized_responses::EncryptionInfo,
1003    ///     ruma::{
1004    ///         events::room::{
1005    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1006    ///         },
1007    ///         push::Action,
1008    ///     },
1009    /// };
1010    ///
1011    /// # async fn example(client: Client) {
1012    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1013    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1014    ///
1015    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1016    /// // to distinguish between unencrypted events and events that were decrypted
1017    /// // by the SDK.
1018    /// let _ = client
1019    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1020    ///     );
1021    ///
1022    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1023    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1024    /// // should be highlighted in the timeline.
1025    /// let _ =
1026    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1027    ///
1028    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1029    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1030    /// # }
1031    /// ```
1032    ///
1033    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1034    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1035    where
1036        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1037        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1038    {
1039        self.observe_room_events_impl(None)
1040    }
1041
1042    /// Observe a specific room, and event type.
1043    ///
1044    /// This method works the same way as [`Client::observe_events`], except
1045    /// that the observability will only be applied for events in the room with
1046    /// the specified ID. See that method for more details.
1047    ///
1048    /// Be careful that only the most recent value can be observed. Subscribers
1049    /// are notified when a new value is sent, but there is no guarantee
1050    /// that they will see all values.
1051    pub fn observe_room_events<Ev, Ctx>(
1052        &self,
1053        room_id: &RoomId,
1054    ) -> ObservableEventHandler<(Ev, Ctx)>
1055    where
1056        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1057        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1058    {
1059        self.observe_room_events_impl(Some(room_id.to_owned()))
1060    }
1061
1062    /// Shared implementation for `Client::observe_events` and
1063    /// `Client::observe_room_events`.
1064    fn observe_room_events_impl<Ev, Ctx>(
1065        &self,
1066        room_id: Option<OwnedRoomId>,
1067    ) -> ObservableEventHandler<(Ev, Ctx)>
1068    where
1069        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1070        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1071    {
1072        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1073        // new value.
1074        let shared_observable = SharedObservable::new(None);
1075
1076        ObservableEventHandler::new(
1077            shared_observable.clone(),
1078            self.event_handler_drop_guard(self.add_event_handler_impl(
1079                move |event: Ev, context: Ctx| {
1080                    shared_observable.set(Some((event, context)));
1081
1082                    ready(())
1083                },
1084                room_id,
1085            )),
1086        )
1087    }
1088
1089    /// Subscribe to future `beacon_info` updates for the current user across
1090    /// all rooms.
1091    ///
1092    /// This stream is push-only: it emits only future updates observed during
1093    /// sync processing and does not replay existing state.
1094    pub fn observe_own_beacon_info_updates(
1095        &self,
1096    ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1097        let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1098        let mut stream = observer.subscribe();
1099        let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1100        Ok(async_stream::stream! {
1101            let _observer = observer;
1102
1103            while let Some((event, room)) = stream.next().await {
1104                if event.state_key != own_user_id {
1105                    continue;
1106                }
1107                yield BeaconInfoUpdate {
1108                    room_id: room.room_id().to_owned(),
1109                    event_id: event.event_id,
1110                    content: event.content,
1111                };
1112            }
1113        })
1114    }
1115
1116    /// Remove the event handler associated with the handle.
1117    ///
1118    /// Note that you **must not** call `remove_event_handler` from the
1119    /// non-async part of an event handler, that is:
1120    ///
1121    /// ```ignore
1122    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1123    ///     // âš  this will cause a deadlock âš 
1124    ///     client.remove_event_handler(handle);
1125    ///
1126    ///     async move {
1127    ///         // removing the event handler here is fine
1128    ///         client.remove_event_handler(handle);
1129    ///     }
1130    /// })
1131    /// ```
1132    ///
1133    /// Note also that handlers that remove themselves will still execute with
1134    /// events received in the same sync cycle.
1135    ///
1136    /// # Arguments
1137    ///
1138    /// `handle` - The [`EventHandlerHandle`] that is returned when
1139    /// registering the event handler with [`Client::add_event_handler`].
1140    ///
1141    /// # Examples
1142    ///
1143    /// ```no_run
1144    /// # use url::Url;
1145    /// # use tokio::sync::mpsc;
1146    /// #
1147    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1148    /// #
1149    /// use matrix_sdk::{
1150    ///     Client, event_handler::EventHandlerHandle,
1151    ///     ruma::events::room::member::SyncRoomMemberEvent,
1152    /// };
1153    /// #
1154    /// # futures_executor::block_on(async {
1155    /// # let client = matrix_sdk::Client::builder()
1156    /// #     .homeserver_url(homeserver)
1157    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1158    /// #     .build()
1159    /// #     .await
1160    /// #     .unwrap();
1161    ///
1162    /// client.add_event_handler(
1163    ///     |ev: SyncRoomMemberEvent,
1164    ///      client: Client,
1165    ///      handle: EventHandlerHandle| async move {
1166    ///         // Common usage: Check arriving Event is the expected one
1167    ///         println!("Expected RoomMemberEvent received!");
1168    ///         client.remove_event_handler(handle);
1169    ///     },
1170    /// );
1171    /// # });
1172    /// ```
1173    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1174        self.inner.event_handlers.remove(handle);
1175    }
1176
1177    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1178    /// the given handle.
1179    ///
1180    /// When the returned value is dropped, the event handler will be removed.
1181    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1182        EventHandlerDropGuard::new(handle, self.clone())
1183    }
1184
1185    /// Add an arbitrary value for use as event handler context.
1186    ///
1187    /// The value can be obtained in an event handler by adding an argument of
1188    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1189    ///
1190    /// If a value of the same type has been added before, it will be
1191    /// overwritten.
1192    ///
1193    /// # Examples
1194    ///
1195    /// ```no_run
1196    /// use matrix_sdk::{
1197    ///     Room, event_handler::Ctx,
1198    ///     ruma::events::room::message::SyncRoomMessageEvent,
1199    /// };
1200    /// # #[derive(Clone)]
1201    /// # struct SomeType;
1202    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1203    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1204    /// # futures_executor::block_on(async {
1205    /// # let client = matrix_sdk::Client::builder()
1206    /// #     .homeserver_url(homeserver)
1207    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1208    /// #     .build()
1209    /// #     .await
1210    /// #     .unwrap();
1211    ///
1212    /// // Handle used to send messages to the UI part of the app
1213    /// let my_gui_handle: SomeType = obtain_gui_handle();
1214    ///
1215    /// client.add_event_handler_context(my_gui_handle.clone());
1216    /// client.add_event_handler(
1217    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1218    ///         async move {
1219    ///             // gui_handle.send(DisplayMessage { message: ev });
1220    ///         }
1221    ///     },
1222    /// );
1223    /// # });
1224    /// ```
1225    pub fn add_event_handler_context<T>(&self, ctx: T)
1226    where
1227        T: Clone + Send + Sync + 'static,
1228    {
1229        self.inner.event_handlers.add_context(ctx);
1230    }
1231
1232    /// Register a handler for a notification.
1233    ///
1234    /// Similar to [`Client::add_event_handler`], but only allows functions
1235    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1236    /// [`Client`] for now.
1237    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1238    where
1239        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1240        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1241    {
1242        self.inner.notification_handlers.write().await.push(Box::new(
1243            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1244        ));
1245
1246        self
1247    }
1248
1249    /// Subscribe to all updates for the room with the given ID.
1250    ///
1251    /// The returned receiver will receive a new message for each sync response
1252    /// that contains updates for that room.
1253    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1254        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1255            btree_map::Entry::Vacant(entry) => {
1256                let (tx, rx) = broadcast::channel(8);
1257                entry.insert(tx);
1258                rx
1259            }
1260            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1261        }
1262    }
1263
1264    /// Subscribe to all updates to all rooms, whenever any has been received in
1265    /// a sync response.
1266    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1267        self.inner.room_updates_sender.subscribe()
1268    }
1269
1270    pub(crate) async fn notification_handlers(
1271        &self,
1272    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1273        self.inner.notification_handlers.read().await
1274    }
1275
1276    /// Get all the rooms the client knows about.
1277    ///
1278    /// This will return the list of joined, invited, and left rooms.
1279    pub fn rooms(&self) -> Vec<Room> {
1280        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1281    }
1282
1283    /// Get all the rooms the client knows about, filtered by room state.
1284    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1285        self.base_client()
1286            .rooms_filtered(filter)
1287            .into_iter()
1288            .map(|room| Room::new(self.clone(), room))
1289            .collect()
1290    }
1291
1292    /// Get a stream of all the rooms, in addition to the existing rooms.
1293    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1294        let (rooms, stream) = self.base_client().rooms_stream();
1295
1296        let map_room = |room| Room::new(self.clone(), room);
1297
1298        (
1299            rooms.into_iter().map(map_room).collect(),
1300            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1301        )
1302    }
1303
1304    /// Returns the joined rooms this client knows about.
1305    pub fn joined_rooms(&self) -> Vec<Room> {
1306        self.rooms_filtered(RoomStateFilter::JOINED)
1307    }
1308
1309    /// Returns the invited rooms this client knows about.
1310    pub fn invited_rooms(&self) -> Vec<Room> {
1311        self.rooms_filtered(RoomStateFilter::INVITED)
1312    }
1313
1314    /// Returns the left rooms this client knows about.
1315    pub fn left_rooms(&self) -> Vec<Room> {
1316        self.rooms_filtered(RoomStateFilter::LEFT)
1317    }
1318
1319    /// Returns the joined space rooms this client knows about.
1320    pub fn joined_space_rooms(&self) -> Vec<Room> {
1321        self.base_client()
1322            .rooms_filtered(RoomStateFilter::JOINED)
1323            .into_iter()
1324            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1325            .collect()
1326    }
1327
1328    /// Get a room with the given room id.
1329    ///
1330    /// # Arguments
1331    ///
1332    /// `room_id` - The unique id of the room that should be fetched.
1333    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1334        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1335    }
1336
1337    /// Gets the preview of a room, whether the current user has joined it or
1338    /// not.
1339    pub async fn get_room_preview(
1340        &self,
1341        room_or_alias_id: &RoomOrAliasId,
1342        via: Vec<OwnedServerName>,
1343    ) -> Result<RoomPreview> {
1344        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1345            Ok(room_id) => room_id.to_owned(),
1346            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1347        };
1348
1349        if let Some(room) = self.get_room(&room_id) {
1350            // The cached data can only be trusted if the room state is joined or
1351            // banned: for invite and knock rooms, no updates will be received
1352            // for the rooms after the invite/knock action took place so we may
1353            // have very out to date data for important fields such as
1354            // `join_rule`. For left rooms, the homeserver should return the latest info.
1355            match room.state() {
1356                RoomState::Joined | RoomState::Banned => {
1357                    return Ok(RoomPreview::from_known_room(&room).await);
1358                }
1359                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1360            }
1361        }
1362
1363        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1364    }
1365
1366    /// Resolve a room alias to a room id and a list of servers which know
1367    /// about it.
1368    ///
1369    /// # Arguments
1370    ///
1371    /// `room_alias` - The room alias to be resolved.
1372    pub async fn resolve_room_alias(
1373        &self,
1374        room_alias: &RoomAliasId,
1375    ) -> HttpResult<get_alias::v3::Response> {
1376        let request = get_alias::v3::Request::new(room_alias.to_owned());
1377        self.send(request).await
1378    }
1379
1380    /// Checks if a room alias is not in use yet.
1381    ///
1382    /// Returns:
1383    /// - `Ok(true)` if the room alias is available.
1384    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1385    ///   status code).
1386    /// - An `Err` otherwise.
1387    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1388        match self.resolve_room_alias(alias).await {
1389            // The room alias was resolved, so it's already in use.
1390            Ok(_) => Ok(false),
1391            Err(error) => {
1392                match error.client_api_error_kind() {
1393                    // The room alias wasn't found, so it's available.
1394                    Some(ErrorKind::NotFound) => Ok(true),
1395                    _ => Err(error),
1396                }
1397            }
1398        }
1399    }
1400
1401    /// Adds a new room alias associated with a room to the room directory.
1402    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1403        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1404        self.send(request).await?;
1405        Ok(())
1406    }
1407
1408    /// Removes a room alias from the room directory.
1409    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1410        let request = delete_alias::v3::Request::new(alias.to_owned());
1411        self.send(request).await?;
1412        Ok(())
1413    }
1414
1415    /// Update the homeserver from the login response well-known if needed.
1416    ///
1417    /// # Arguments
1418    ///
1419    /// * `login_well_known` - The `well_known` field from a successful login
1420    ///   response.
1421    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1422        if self.inner.respect_login_well_known
1423            && let Some(well_known) = login_well_known
1424            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1425        {
1426            self.set_homeserver(homeserver);
1427        }
1428    }
1429
1430    /// Similar to [`Client::restore_session_with`], with
1431    /// [`RoomLoadSettings::default()`].
1432    ///
1433    /// # Panics
1434    ///
1435    /// Panics if a session was already restored or logged in.
1436    #[instrument(skip_all)]
1437    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1438        self.restore_session_with(session, RoomLoadSettings::default()).await
1439    }
1440
1441    /// Restore a session previously logged-in using one of the available
1442    /// authentication APIs. The number of rooms to restore is controlled by
1443    /// [`RoomLoadSettings`].
1444    ///
1445    /// See the documentation of the corresponding authentication API's
1446    /// `restore_session` method for more information.
1447    ///
1448    /// # Panics
1449    ///
1450    /// Panics if a session was already restored or logged in.
1451    #[instrument(skip_all)]
1452    pub async fn restore_session_with(
1453        &self,
1454        session: impl Into<AuthSession>,
1455        room_load_settings: RoomLoadSettings,
1456    ) -> Result<()> {
1457        let session = session.into();
1458        match session {
1459            AuthSession::Matrix(session) => {
1460                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1461            }
1462            AuthSession::OAuth(session) => {
1463                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1464            }
1465        }
1466    }
1467
1468    /// Refresh the access token using the authentication API used to log into
1469    /// this session.
1470    ///
1471    /// See the documentation of the authentication API's `refresh_access_token`
1472    /// method for more information.
1473    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1474        let Some(auth_api) = self.auth_api() else {
1475            return Err(RefreshTokenError::RefreshTokenRequired);
1476        };
1477
1478        match auth_api {
1479            AuthApi::Matrix(api) => {
1480                trace!("Token refresh: Using the homeserver.");
1481                Box::pin(api.refresh_access_token()).await?;
1482            }
1483            AuthApi::OAuth(api) => {
1484                trace!("Token refresh: Using OAuth 2.0.");
1485                Box::pin(api.refresh_access_token()).await?;
1486            }
1487        }
1488
1489        Ok(())
1490    }
1491
1492    /// Log out the current session using the proper authentication API.
1493    ///
1494    /// # Errors
1495    ///
1496    /// Returns an error if the session is not authenticated or if an error
1497    /// occurred while making the request to the server.
1498    pub async fn logout(&self) -> Result<(), Error> {
1499        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1500        match auth_api {
1501            AuthApi::Matrix(matrix_auth) => {
1502                matrix_auth.logout().await?;
1503                Ok(())
1504            }
1505            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1506        }
1507    }
1508
1509    /// Get or upload a sync filter.
1510    ///
1511    /// This method will either get a filter ID from the store or upload the
1512    /// filter definition to the homeserver and return the new filter ID.
1513    ///
1514    /// # Arguments
1515    ///
1516    /// * `filter_name` - The unique name of the filter, this name will be used
1517    /// locally to store and identify the filter ID returned by the server.
1518    ///
1519    /// * `definition` - The filter definition that should be uploaded to the
1520    /// server if no filter ID can be found in the store.
1521    ///
1522    /// # Examples
1523    ///
1524    /// ```no_run
1525    /// # use matrix_sdk::{
1526    /// #    Client, config::SyncSettings,
1527    /// #    ruma::api::client::{
1528    /// #        filter::{
1529    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1530    /// #        },
1531    /// #        sync::sync_events::v3::Filter,
1532    /// #    }
1533    /// # };
1534    /// # use url::Url;
1535    /// # async {
1536    /// # let homeserver = Url::parse("http://example.com").unwrap();
1537    /// # let client = Client::new(homeserver).await.unwrap();
1538    /// let mut filter = FilterDefinition::default();
1539    ///
1540    /// // Let's enable member lazy loading.
1541    /// filter.room.state.lazy_load_options =
1542    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1543    ///
1544    /// let filter_id = client
1545    ///     .get_or_upload_filter("sync", filter)
1546    ///     .await
1547    ///     .unwrap();
1548    ///
1549    /// let sync_settings = SyncSettings::new()
1550    ///     .filter(Filter::FilterId(filter_id));
1551    ///
1552    /// let response = client.sync_once(sync_settings).await.unwrap();
1553    /// # };
1554    #[instrument(skip(self, definition))]
1555    pub async fn get_or_upload_filter(
1556        &self,
1557        filter_name: &str,
1558        definition: FilterDefinition,
1559    ) -> Result<String> {
1560        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1561            debug!("Found filter locally");
1562            Ok(filter)
1563        } else {
1564            debug!("Didn't find filter locally");
1565            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1566            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1567            let response = self.send(request).await?;
1568
1569            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1570
1571            Ok(response.filter_id)
1572        }
1573    }
1574
1575    /// Prepare to join a room by ID, by getting the current details about it
1576    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1577        let room = self.get_room(room_id)?;
1578
1579        let inviter = match room.invite_details().await {
1580            Ok(details) => details.inviter,
1581            Err(Error::WrongRoomState(_)) => None,
1582            Err(e) => {
1583                warn!("Error fetching invite details for room: {e:?}");
1584                None
1585            }
1586        };
1587
1588        Some(PreJoinRoomInfo { inviter })
1589    }
1590
1591    /// Finish joining a room.
1592    ///
1593    /// If the room was an invite that should be marked as a DM, will include it
1594    /// in the DM event after creating the joined room.
1595    ///
1596    /// If encrypted history sharing is enabled, will check to see if we have a
1597    /// key bundle, and import it if so.
1598    ///
1599    /// # Arguments
1600    ///
1601    /// * `room_id` - The `RoomId` of the room that was joined.
1602    /// * `pre_join_room_info` - Information about the room before we joined.
1603    async fn finish_join_room(
1604        &self,
1605        room_id: &RoomId,
1606        pre_join_room_info: Option<PreJoinRoomInfo>,
1607    ) -> Result<Room> {
1608        info!(?room_id, ?pre_join_room_info, "Completing room join");
1609        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1610            room.state() == RoomState::Invited
1611                && room.is_direct().await.unwrap_or_else(|e| {
1612                    warn!(%room_id, "is_direct() failed: {e}");
1613                    false
1614                })
1615        } else {
1616            false
1617        };
1618
1619        let base_room = self
1620            .base_client()
1621            .room_joined(
1622                room_id,
1623                pre_join_room_info
1624                    .as_ref()
1625                    .and_then(|info| info.inviter.as_ref())
1626                    .map(|i| i.user_id().to_owned()),
1627            )
1628            .await?;
1629        let room = Room::new(self.clone(), base_room);
1630
1631        if mark_as_dm {
1632            room.set_is_direct(true).await?;
1633        }
1634
1635        // If we joined following an invite, check if we had previously received a key
1636        // bundle from the inviter, and import it if so.
1637        //
1638        // It's important that we only do this once `BaseClient::room_joined` has
1639        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1640        // race.
1641        #[cfg(feature = "e2e-encryption")]
1642        if self.inner.enable_share_history_on_invite
1643            && let Some(inviter) =
1644                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1645        {
1646            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1647                .await?;
1648        }
1649
1650        // Suppress "unused variable" and "unused field" lints
1651        #[cfg(not(feature = "e2e-encryption"))]
1652        let _ = pre_join_room_info.map(|i| i.inviter);
1653
1654        Ok(room)
1655    }
1656
1657    /// Join a room by `RoomId`.
1658    ///
1659    /// Returns the `Room` in the joined state.
1660    ///
1661    /// # Arguments
1662    ///
1663    /// * `room_id` - The `RoomId` of the room to be joined.
1664    #[instrument(skip(self))]
1665    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1666        // See who invited us to this room, if anyone. Note we have to do this before
1667        // making the `/join` request, otherwise we could race against the sync.
1668        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1669
1670        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1671        let response = self.send(request).await?;
1672        self.finish_join_room(&response.room_id, pre_join_info).await
1673    }
1674
1675    /// Join a room by `RoomOrAliasId`.
1676    ///
1677    /// Returns the `Room` in the joined state.
1678    ///
1679    /// # Arguments
1680    ///
1681    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1682    ///   alias looks like `#name:example.com`.
1683    /// * `server_names` - The server names to be used for resolving the alias,
1684    ///   if needs be.
1685    #[instrument(skip(self))]
1686    pub async fn join_room_by_id_or_alias(
1687        &self,
1688        alias: &RoomOrAliasId,
1689        server_names: &[OwnedServerName],
1690    ) -> Result<Room> {
1691        let pre_join_info = {
1692            match alias.try_into() {
1693                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1694                Err(_) => {
1695                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1696                    // responding to an invitation to the room, and therefore don't need to handle
1697                    // things that happen as a result of invites.
1698                    None
1699                }
1700            }
1701        };
1702        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1703            via: server_names.to_owned(),
1704        });
1705        let response = self.send(request).await?;
1706        self.finish_join_room(&response.room_id, pre_join_info).await
1707    }
1708
1709    /// Search the homeserver's directory of public rooms.
1710    ///
1711    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1712    /// a `get_public_rooms::Response`.
1713    ///
1714    /// # Arguments
1715    ///
1716    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1717    ///
1718    /// * `since` - Pagination token from a previous request.
1719    ///
1720    /// * `server` - The name of the server, if `None` the requested server is
1721    ///   used.
1722    ///
1723    /// # Examples
1724    /// ```no_run
1725    /// use matrix_sdk::Client;
1726    /// # use url::Url;
1727    /// # let homeserver = Url::parse("http://example.com").unwrap();
1728    /// # let limit = Some(10);
1729    /// # let since = Some("since token");
1730    /// # let server = Some("servername.com".try_into().unwrap());
1731    /// # async {
1732    /// let mut client = Client::new(homeserver).await.unwrap();
1733    ///
1734    /// client.public_rooms(limit, since, server).await;
1735    /// # };
1736    /// ```
1737    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1738    pub async fn public_rooms(
1739        &self,
1740        limit: Option<u32>,
1741        since: Option<&str>,
1742        server: Option<&ServerName>,
1743    ) -> HttpResult<get_public_rooms::v3::Response> {
1744        let limit = limit.map(UInt::from);
1745
1746        let request = assign!(get_public_rooms::v3::Request::new(), {
1747            limit,
1748            since: since.map(ToOwned::to_owned),
1749            server: server.map(ToOwned::to_owned),
1750        });
1751        self.send(request).await
1752    }
1753
1754    /// Create a room with the given parameters.
1755    ///
1756    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1757    /// created room.
1758    ///
1759    /// If you want to create a direct message with one specific user, you can
1760    /// use [`create_dm`][Self::create_dm], which is more convenient than
1761    /// assembling the [`create_room::v3::Request`] yourself.
1762    ///
1763    /// If the `is_direct` field of the request is set to `true` and at least
1764    /// one user is invited, the room will be automatically added to the direct
1765    /// rooms in the account data.
1766    ///
1767    /// # Examples
1768    ///
1769    /// ```no_run
1770    /// use matrix_sdk::{
1771    ///     Client,
1772    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1773    /// };
1774    /// # use url::Url;
1775    /// #
1776    /// # async {
1777    /// # let homeserver = Url::parse("http://example.com").unwrap();
1778    /// let request = CreateRoomRequest::new();
1779    /// let client = Client::new(homeserver).await.unwrap();
1780    /// assert!(client.create_room(request).await.is_ok());
1781    /// # };
1782    /// ```
1783    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1784        let invite = request.invite.clone();
1785        let is_direct_room = request.is_direct;
1786        let response = self.send(request).await?;
1787
1788        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1789
1790        let joined_room = Room::new(self.clone(), base_room);
1791
1792        if is_direct_room
1793            && !invite.is_empty()
1794            && let Err(error) =
1795                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1796        {
1797            // FIXME: Retry in the background
1798            error!("Failed to mark room as DM: {error}");
1799        }
1800
1801        Ok(joined_room)
1802    }
1803
1804    /// Create a DM room.
1805    ///
1806    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1807    /// given user being invited, the room marked `is_direct` and both the
1808    /// creator and invitee getting the default maximum power level.
1809    ///
1810    /// If the `e2e-encryption` feature is enabled, the room will also be
1811    /// encrypted.
1812    ///
1813    /// # Arguments
1814    ///
1815    /// * `user_id` - The ID of the user to create a DM for.
1816    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1817        #[cfg(feature = "e2e-encryption")]
1818        let initial_state = vec![
1819            InitialStateEvent::with_empty_state_key(
1820                RoomEncryptionEventContent::with_recommended_defaults(),
1821            )
1822            .to_raw_any(),
1823        ];
1824
1825        #[cfg(not(feature = "e2e-encryption"))]
1826        let initial_state = vec![];
1827
1828        let request = assign!(create_room::v3::Request::new(), {
1829            invite: vec![user_id.to_owned()],
1830            is_direct: true,
1831            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1832            initial_state,
1833        });
1834
1835        self.create_room(request).await
1836    }
1837
1838    /// Get the first existing DM room with the given user, if any.
1839    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1840        self.get_dm_rooms(user_id).next()
1841    }
1842
1843    /// Get an iterator with the existing DM rooms for the given user.
1844    pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1845        let rooms = self.joined_rooms();
1846
1847        let dm_definition = &self.base_client().dm_room_definition;
1848
1849        // Find the room we share with the `user_id` and only with `user_id`
1850        let rooms = rooms.into_iter().filter(move |r| {
1851            let targets = r.direct_targets();
1852            let targets_match =
1853                targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1854            match dm_definition {
1855                DmRoomDefinition::MatrixSpec => targets_match,
1856                DmRoomDefinition::TwoMembers => {
1857                    let service_members_count =
1858                        r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1859                    let active_non_service_members =
1860                        r.active_members_count().saturating_sub(service_members_count);
1861                    targets_match && active_non_service_members <= 2
1862                }
1863            }
1864        });
1865
1866        trace!(?user_id, ?rooms, "Found DM rooms with user");
1867        rooms
1868    }
1869
1870    /// Search the homeserver's directory for public rooms with a filter.
1871    ///
1872    /// # Arguments
1873    ///
1874    /// * `room_search` - The easiest way to create this request is using the
1875    ///   `get_public_rooms_filtered::Request` itself.
1876    ///
1877    /// # Examples
1878    ///
1879    /// ```no_run
1880    /// # use url::Url;
1881    /// # use matrix_sdk::Client;
1882    /// # async {
1883    /// # let homeserver = Url::parse("http://example.com")?;
1884    /// use matrix_sdk::ruma::{
1885    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1886    /// };
1887    /// # let mut client = Client::new(homeserver).await?;
1888    ///
1889    /// let mut filter = Filter::new();
1890    /// filter.generic_search_term = Some("rust".to_owned());
1891    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1892    /// request.filter = filter;
1893    ///
1894    /// let response = client.public_rooms_filtered(request).await?;
1895    ///
1896    /// for room in response.chunk {
1897    ///     println!("Found room {room:?}");
1898    /// }
1899    /// # anyhow::Ok(()) };
1900    /// ```
1901    pub async fn public_rooms_filtered(
1902        &self,
1903        request: get_public_rooms_filtered::v3::Request,
1904    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1905        self.send(request).await
1906    }
1907
1908    /// Send an arbitrary request to the server, without updating client state.
1909    ///
1910    /// **Warning:** Because this method *does not* update the client state, it
1911    /// is important to make sure that you account for this yourself, and
1912    /// use wrapper methods where available.  This method should *only* be
1913    /// used if a wrapper method for the endpoint you'd like to use is not
1914    /// available.
1915    ///
1916    /// # Arguments
1917    ///
1918    /// * `request` - A filled out and valid request for the endpoint to be hit
1919    ///
1920    /// * `timeout` - An optional request timeout setting, this overrides the
1921    ///   default request setting if one was set.
1922    ///
1923    /// # Examples
1924    ///
1925    /// ```no_run
1926    /// # use matrix_sdk::{Client, config::SyncSettings};
1927    /// # use url::Url;
1928    /// # async {
1929    /// # let homeserver = Url::parse("http://localhost:8080")?;
1930    /// # let mut client = Client::new(homeserver).await?;
1931    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1932    ///
1933    /// // First construct the request you want to make
1934    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1935    /// // for all available Endpoints
1936    /// let user_id = owned_user_id!("@example:localhost");
1937    /// let request = profile::get_profile::v3::Request::new(user_id);
1938    ///
1939    /// // Start the request using Client::send()
1940    /// let response = client.send(request).await?;
1941    ///
1942    /// // Check the corresponding Response struct to find out what types are
1943    /// // returned
1944    /// # anyhow::Ok(()) };
1945    /// ```
1946    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1947    where
1948        Request: OutgoingRequest + Clone + Debug,
1949        Request::Authentication: SupportedAuthScheme,
1950        Request::PathBuilder: SupportedPathBuilder,
1951        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1952        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1953    {
1954        SendRequest {
1955            client: self.clone(),
1956            request,
1957            config: None,
1958            send_progress: Default::default(),
1959        }
1960    }
1961
1962    pub(crate) async fn send_inner<Request>(
1963        &self,
1964        request: Request,
1965        config: Option<RequestConfig>,
1966        send_progress: SharedObservable<TransmissionProgress>,
1967    ) -> HttpResult<Request::IncomingResponse>
1968    where
1969        Request: OutgoingRequest + Debug,
1970        Request::Authentication: SupportedAuthScheme,
1971        Request::PathBuilder: SupportedPathBuilder,
1972        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1973        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1974    {
1975        let homeserver = self.homeserver().to_string();
1976        let access_token = self.access_token();
1977        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1978
1979        let path_builder_input =
1980            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1981
1982        let result = self
1983            .inner
1984            .http_client
1985            .send(
1986                request,
1987                config,
1988                homeserver,
1989                access_token.as_deref(),
1990                path_builder_input,
1991                send_progress,
1992            )
1993            .await;
1994
1995        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1996            result.as_ref().map_err(HttpError::client_api_error_kind)
1997            && let Some(access_token) = &access_token
1998        {
1999            // Mark the access token as expired.
2000            self.auth_ctx().set_access_token_expired(access_token);
2001        }
2002
2003        result
2004    }
2005
2006    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2007        _ = self
2008            .inner
2009            .auth_ctx
2010            .session_change_sender
2011            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2012    }
2013
2014    /// Fetches server versions from network; no caching.
2015    pub async fn fetch_server_versions(
2016        &self,
2017        request_config: Option<RequestConfig>,
2018    ) -> HttpResult<get_supported_versions::Response> {
2019        // Since this was called by the user, try to refresh the access token if
2020        // necessary.
2021        self.fetch_server_versions_inner(false, request_config).await
2022    }
2023
2024    /// Fetches server versions from network; no caching.
2025    ///
2026    /// If the access token is expired and `failsafe` is `false`, this will
2027    /// attempt to refresh the access token, otherwise this will try to make an
2028    /// unauthenticated request instead.
2029    pub(crate) async fn fetch_server_versions_inner(
2030        &self,
2031        failsafe: bool,
2032        request_config: Option<RequestConfig>,
2033    ) -> HttpResult<get_supported_versions::Response> {
2034        if !failsafe {
2035            // `Client::send()` handles refreshing access tokens.
2036            return self
2037                .send(get_supported_versions::Request::new())
2038                .with_request_config(request_config)
2039                .await;
2040        }
2041
2042        let homeserver = self.homeserver().to_string();
2043
2044        // If we have a fresh access token, try with it first.
2045        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2046            && self.auth_ctx().has_valid_access_token()
2047            && let Some(access_token) = self.access_token()
2048        {
2049            let result = self
2050                .inner
2051                .http_client
2052                .send(
2053                    get_supported_versions::Request::new(),
2054                    request_config,
2055                    homeserver.clone(),
2056                    Some(&access_token),
2057                    (),
2058                    Default::default(),
2059                )
2060                .await;
2061
2062            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2063                result.as_ref().map_err(HttpError::client_api_error_kind)
2064            {
2065                // If the access token is actually expired, mark it as expired and fallback to
2066                // the unauthenticated request below.
2067                self.auth_ctx().set_access_token_expired(&access_token);
2068            } else {
2069                // If the request succeeded or it's an other error, just stop now.
2070                return result;
2071            }
2072        }
2073
2074        // Try without authentication.
2075        self.inner
2076            .http_client
2077            .send(
2078                get_supported_versions::Request::new(),
2079                request_config,
2080                homeserver.clone(),
2081                None,
2082                (),
2083                Default::default(),
2084            )
2085            .await
2086    }
2087
2088    /// Fetches client well_known from network; no caching.
2089    ///
2090    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2091    ///    well-known contents.
2092    /// 2. If it's not, we try extracting the server name from the
2093    ///    [`Client::user_id`] and building the server URL from it.
2094    /// 3. If we couldn't get the well-known contents with either the explicit
2095    ///    server name or the implicit extracted one, we try the homeserver URL
2096    ///    as a last resort.
2097    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2098        let homeserver = self.homeserver();
2099        let scheme = homeserver.scheme();
2100
2101        // Use the server name, either an explicit one or an implicit one taken from
2102        // the user id: sometimes we'll have only the homeserver url available and no
2103        // server name, but the server name can be extracted from the current user id.
2104        let server_url = self
2105            .server()
2106            .map(|server| server.to_string())
2107            // If the server name wasn't available, extract it from the user id and build a URL:
2108            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2109            // will be the same for the public server url, lacking a better candidate.
2110            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2111
2112        // If the server name is available, first try using it
2113        let response = if let Some(server_url) = server_url {
2114            // First try using the server name
2115            self.fetch_client_well_known_with_url(server_url).await
2116        } else {
2117            None
2118        };
2119
2120        // If we didn't get a well-known value yet, try with the homeserver url instead:
2121        if response.is_none() {
2122            // Sometimes people configure their well-known directly on the homeserver so use
2123            // this as a fallback when the server name is unknown.
2124            warn!(
2125                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2126            );
2127            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2128        } else {
2129            response
2130        }
2131    }
2132
2133    async fn fetch_client_well_known_with_url(
2134        &self,
2135        url: String,
2136    ) -> Option<discover_homeserver::Response> {
2137        let well_known = self
2138            .inner
2139            .http_client
2140            .send(
2141                discover_homeserver::Request::new(),
2142                Some(RequestConfig::short_retry()),
2143                url,
2144                None,
2145                (),
2146                Default::default(),
2147            )
2148            .await;
2149
2150        match well_known {
2151            Ok(well_known) => Some(well_known),
2152            Err(http_error) => {
2153                // It is perfectly valid to not have a well-known file.
2154                // Maybe we should check for a specific error code to be sure?
2155                warn!("Failed to fetch client well-known: {http_error}");
2156                None
2157            }
2158        }
2159    }
2160
2161    /// Load supported versions from storage, or fetch them from network and
2162    /// cache them.
2163    ///
2164    /// If `failsafe` is true, this will try to minimize side effects to avoid
2165    /// possible deadlocks.
2166    async fn fetch_supported_versions(
2167        &self,
2168        failsafe: bool,
2169    ) -> HttpResult<SupportedVersionsResponse> {
2170        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2171        let supported_versions = SupportedVersionsResponse {
2172            versions: server_versions.versions,
2173            unstable_features: server_versions.unstable_features,
2174        };
2175
2176        Ok(supported_versions)
2177    }
2178
2179    /// Get the Matrix versions and features supported by the homeserver by
2180    /// fetching them from the server or the cache.
2181    ///
2182    /// This is equivalent to calling both [`Client::server_versions()`] and
2183    /// [`Client::unstable_features()`]. To always fetch the result from the
2184    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2185    /// and then `.as_supported_versions()` on the response.
2186    ///
2187    /// # Examples
2188    ///
2189    /// ```no_run
2190    /// use ruma::api::{FeatureFlag, MatrixVersion};
2191    /// # use matrix_sdk::{Client, config::SyncSettings};
2192    /// # use url::Url;
2193    /// # async {
2194    /// # let homeserver = Url::parse("http://localhost:8080")?;
2195    /// # let mut client = Client::new(homeserver).await?;
2196    ///
2197    /// let supported = client.supported_versions().await?;
2198    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2199    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2200    ///
2201    /// let msc_x_feature = FeatureFlag::from("msc_x");
2202    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2203    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2204    /// # anyhow::Ok(()) };
2205    /// ```
2206    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2207        self.supported_versions_inner(false).await
2208    }
2209
2210    /// Get the Matrix versions and features supported by the homeserver by
2211    /// fetching them from the server or the cache.
2212    ///
2213    /// If `failsafe` is true, this will try to minimize side effects to avoid
2214    /// possible deadlocks.
2215    pub(crate) async fn supported_versions_inner(
2216        &self,
2217        failsafe: bool,
2218    ) -> HttpResult<SupportedVersions> {
2219        match self.supported_versions_cached_inner(failsafe).await {
2220            Ok(Some(value)) => {
2221                return Ok(value);
2222            }
2223            Ok(None) => {
2224                // The cache is empty, make a request.
2225            }
2226            Err(error) => {
2227                warn!("error when loading cached supported versions: {error}");
2228                // Fallthrough to make a request.
2229            }
2230        }
2231
2232        self.refresh_supported_versions_cache(failsafe).await
2233    }
2234
2235    /// Refresh the Matrix versions and features supported by the homeserver in
2236    /// the cache.
2237    ///
2238    /// If `failsafe` is true, this will try to minimize side effects to avoid
2239    /// possible deadlocks.
2240    async fn refresh_supported_versions_cache(
2241        &self,
2242        failsafe: bool,
2243    ) -> HttpResult<SupportedVersions> {
2244        let cached_supported_versions = &self.inner.caches.supported_versions;
2245
2246        let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2247            Ok(guard) => guard,
2248            Err(_) => {
2249                // There is already a refresh in progress, wait for it to finish.
2250                let guard = cached_supported_versions.refresh_lock.lock().await;
2251
2252                if let Err(error) = guard.as_ref() {
2253                    // There was an error in the previous refresh, return it.
2254                    return Err(HttpError::Cached(error.clone()));
2255                }
2256
2257                // Reuse the data if it was cached and it hasn't expired.
2258                if let CachedValue::Cached(value) = cached_supported_versions.value()
2259                    && !value.has_expired()
2260                {
2261                    return Ok(value.into_data());
2262                }
2263
2264                // The data wasn't cached or has expired, we need to make another request.
2265                guard
2266            }
2267        };
2268
2269        let response = match self.fetch_supported_versions(failsafe).await {
2270            Ok(response) => {
2271                *supported_versions_guard = Ok(());
2272                TtlValue::new(response)
2273            }
2274            Err(error) => {
2275                let error = Arc::new(error);
2276                *supported_versions_guard = Err(error.clone());
2277                return Err(HttpError::Cached(error));
2278            }
2279        };
2280
2281        let supported_versions = response.as_ref().map(|response| response.supported_versions());
2282
2283        // Only cache the result if the request was authenticated.
2284        if self.auth_ctx().has_valid_access_token() {
2285            if let Err(err) = self
2286                .state_store()
2287                .set_kv_data(
2288                    StateStoreDataKey::SupportedVersions,
2289                    StateStoreDataValue::SupportedVersions(response),
2290                )
2291                .await
2292            {
2293                warn!("error when caching supported versions: {err}");
2294            }
2295
2296            cached_supported_versions.set_value(supported_versions.clone());
2297        }
2298
2299        Ok(supported_versions.into_data())
2300    }
2301
2302    /// Get the Matrix versions and features supported by the homeserver by
2303    /// fetching them from the cache.
2304    ///
2305    /// For a version of this function that fetches the supported versions and
2306    /// features from the homeserver if the [`SupportedVersions`] aren't
2307    /// found in the cache, take a look at the [`Client::supported_versions()`]
2308    /// method.
2309    ///
2310    /// If the data in the cache has expired, this will trigger a background
2311    /// task to refresh it.
2312    ///
2313    /// # Examples
2314    ///
2315    /// ```no_run
2316    /// use ruma::api::{FeatureFlag, MatrixVersion};
2317    /// # use matrix_sdk::{Client, config::SyncSettings};
2318    /// # use url::Url;
2319    /// # async {
2320    /// # let homeserver = Url::parse("http://localhost:8080")?;
2321    /// # let mut client = Client::new(homeserver).await?;
2322    ///
2323    /// let supported =
2324    ///     if let Some(supported) = client.supported_versions_cached().await? {
2325    ///         supported
2326    ///     } else {
2327    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2328    ///     };
2329    ///
2330    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2331    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2332    ///
2333    /// let msc_x_feature = FeatureFlag::from("msc_x");
2334    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2335    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2336    /// # anyhow::Ok(()) };
2337    /// ```
2338    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2339        self.supported_versions_cached_inner(false).await
2340    }
2341
2342    async fn supported_versions_cached_inner(
2343        &self,
2344        failsafe: bool,
2345    ) -> Result<Option<SupportedVersions>, StoreError> {
2346        let supported_versions_cache = &self.inner.caches.supported_versions;
2347
2348        let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2349            cached
2350        } else if let Some(stored) = self
2351            .state_store()
2352            .get_kv_data(StateStoreDataKey::SupportedVersions)
2353            .await?
2354            .and_then(|value| value.into_supported_versions())
2355        {
2356            let stored = stored.map(|response| response.supported_versions());
2357
2358            // Copy the data from the store in the in-memory cache.
2359            supported_versions_cache.set_value(stored.clone());
2360
2361            stored
2362        } else {
2363            return Ok(None);
2364        };
2365
2366        // Spawn a task to refresh the cache if it has expired and we have a valid
2367        // access token.
2368        if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2369            debug!("spawning task to refresh supported versions cache");
2370
2371            let client = self.clone();
2372            self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2373                if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2374                    warn!("failed to refresh supported versions cache: {error}");
2375                }
2376            });
2377        }
2378
2379        Ok(Some(value.into_data()))
2380    }
2381
2382    /// Get the Matrix versions supported by the homeserver by fetching them
2383    /// from the server or the cache.
2384    ///
2385    /// # Examples
2386    ///
2387    /// ```no_run
2388    /// use ruma::api::MatrixVersion;
2389    /// # use matrix_sdk::{Client, config::SyncSettings};
2390    /// # use url::Url;
2391    /// # async {
2392    /// # let homeserver = Url::parse("http://localhost:8080")?;
2393    /// # let mut client = Client::new(homeserver).await?;
2394    ///
2395    /// let server_versions = client.server_versions().await?;
2396    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2397    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2398    /// # anyhow::Ok(()) };
2399    /// ```
2400    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2401        Ok(self.supported_versions().await?.versions)
2402    }
2403
2404    /// Get the unstable features supported by the homeserver by fetching them
2405    /// from the server or the cache.
2406    ///
2407    /// # Examples
2408    ///
2409    /// ```no_run
2410    /// use matrix_sdk::ruma::api::FeatureFlag;
2411    /// # use matrix_sdk::{Client, config::SyncSettings};
2412    /// # use url::Url;
2413    /// # async {
2414    /// # let homeserver = Url::parse("http://localhost:8080")?;
2415    /// # let mut client = Client::new(homeserver).await?;
2416    ///
2417    /// let msc_x_feature = FeatureFlag::from("msc_x");
2418    /// let unstable_features = client.unstable_features().await?;
2419    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2420    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2421    /// # anyhow::Ok(()) };
2422    /// ```
2423    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2424        Ok(self.supported_versions().await?.features)
2425    }
2426
2427    /// Empty the supported versions and unstable features cache.
2428    ///
2429    /// Since the SDK caches the supported versions, it's possible to have a
2430    /// stale entry in the cache. This functions makes it possible to force
2431    /// reset it.
2432    pub async fn reset_supported_versions(&self) -> Result<()> {
2433        // Empty the in-memory cache.
2434        self.inner.caches.supported_versions.reset();
2435
2436        // Empty the store cache.
2437        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2438    }
2439
2440    /// Get the well-known file of the homeserver from the cache.
2441    ///
2442    /// If the data in the cache has expired, this will trigger a background
2443    /// task to refresh it.
2444    async fn well_known_cached(
2445        &self,
2446    ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2447        let well_known_cache = &self.inner.caches.well_known;
2448
2449        let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2450            cached
2451        } else if let Some(stored) = self
2452            .state_store()
2453            .get_kv_data(StateStoreDataKey::WellKnown)
2454            .await?
2455            .and_then(|value| value.into_well_known())
2456        {
2457            // Copy the data from the store into the in-memory cache.
2458            well_known_cache.set_value(stored.clone());
2459
2460            stored
2461        } else {
2462            return Ok(CachedValue::NotSet);
2463        };
2464
2465        // Spawn a task to refresh the cache if it has expired.
2466        if value.has_expired() {
2467            debug!("spawning task to refresh well-known cache");
2468
2469            let client = self.clone();
2470            self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2471                client.refresh_well_known_cache().await;
2472            });
2473        }
2474
2475        Ok(CachedValue::Cached(value.into_data()))
2476    }
2477
2478    /// Refresh the well-known file of the homeserver in the cache.
2479    async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2480        let well_known_cache = &self.inner.caches.well_known;
2481
2482        let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2483            Ok(guard) => guard,
2484            Err(_) => {
2485                // There is already a refresh in progress, wait for it to finish.
2486                let guard = well_known_cache.refresh_lock.lock().await;
2487
2488                // A refresh can't fail because we ignore failures, so there shouldn't be an
2489                // error in the refresh lock.
2490
2491                // Reuse the data if it was cached and it hasn't expired.
2492                if let CachedValue::Cached(value) = well_known_cache.value()
2493                    && !value.has_expired()
2494                {
2495                    return value.into_data();
2496                }
2497
2498                // The data wasn't cached or has expired, we need to make another request.
2499                guard
2500            }
2501        };
2502
2503        let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2504
2505        if let Err(err) = self
2506            .state_store()
2507            .set_kv_data(
2508                StateStoreDataKey::WellKnown,
2509                StateStoreDataValue::WellKnown(well_known.clone()),
2510            )
2511            .await
2512        {
2513            warn!("error when caching well-known: {err}");
2514        }
2515
2516        well_known_cache.set_value(well_known.clone());
2517
2518        well_known.into_data()
2519    }
2520
2521    /// Get the well-known file of the homeserver by fetching it from the server
2522    /// or the cache.
2523    async fn well_known(&self) -> Option<WellKnownResponse> {
2524        match self.well_known_cached().await {
2525            Ok(CachedValue::Cached(value)) => {
2526                return value;
2527            }
2528            Ok(CachedValue::NotSet) => {
2529                // The cache is empty, make a request.
2530            }
2531            Err(error) => {
2532                warn!("error when loading cached well-known: {error}");
2533                // Fallthrough to make a request.
2534            }
2535        }
2536
2537        self.refresh_well_known_cache().await
2538    }
2539
2540    /// Get information about the homeserver's advertised RTC foci by fetching
2541    /// the well-known file from the server or the cache.
2542    ///
2543    /// # Examples
2544    /// ```no_run
2545    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2546    /// # use url::Url;
2547    /// # async {
2548    /// # let homeserver = Url::parse("http://localhost:8080")?;
2549    /// # let mut client = Client::new(homeserver).await?;
2550    /// let rtc_foci = client.rtc_foci().await?;
2551    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2552    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2553    ///     _ => None,
2554    /// });
2555    /// if let Some(info) = default_livekit_focus_info {
2556    ///     println!("Default LiveKit service URL: {}", info.service_url);
2557    /// }
2558    /// # anyhow::Ok(()) };
2559    /// ```
2560    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2561        let well_known = self.well_known().await;
2562
2563        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2564    }
2565
2566    /// Empty the well-known cache.
2567    ///
2568    /// Since the SDK caches the well-known, it's possible to have a stale entry
2569    /// in the cache. This functions makes it possible to force reset it.
2570    pub async fn reset_well_known(&self) -> Result<()> {
2571        // Empty the in-memory caches.
2572        self.inner.caches.well_known.reset();
2573
2574        // Empty the store cache.
2575        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2576    }
2577
2578    /// Check whether MSC 4028 is enabled on the homeserver.
2579    ///
2580    /// # Examples
2581    ///
2582    /// ```no_run
2583    /// # use matrix_sdk::{Client, config::SyncSettings};
2584    /// # use url::Url;
2585    /// # async {
2586    /// # let homeserver = Url::parse("http://localhost:8080")?;
2587    /// # let mut client = Client::new(homeserver).await?;
2588    /// let msc4028_enabled =
2589    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2590    /// # anyhow::Ok(()) };
2591    /// ```
2592    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2593        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2594    }
2595
2596    /// Get information of all our own devices.
2597    ///
2598    /// # Examples
2599    ///
2600    /// ```no_run
2601    /// # use matrix_sdk::{Client, config::SyncSettings};
2602    /// # use url::Url;
2603    /// # async {
2604    /// # let homeserver = Url::parse("http://localhost:8080")?;
2605    /// # let mut client = Client::new(homeserver).await?;
2606    /// let response = client.devices().await?;
2607    ///
2608    /// for device in response.devices {
2609    ///     println!(
2610    ///         "Device: {} {}",
2611    ///         device.device_id,
2612    ///         device.display_name.as_deref().unwrap_or("")
2613    ///     );
2614    /// }
2615    /// # anyhow::Ok(()) };
2616    /// ```
2617    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2618        let request = get_devices::v3::Request::new();
2619
2620        self.send(request).await
2621    }
2622
2623    /// Delete the given devices from the server.
2624    ///
2625    /// # Arguments
2626    ///
2627    /// * `devices` - The list of devices that should be deleted from the
2628    ///   server.
2629    ///
2630    /// * `auth_data` - This request requires user interactive auth, the first
2631    ///   request needs to set this to `None` and will always fail with an
2632    ///   `UiaaResponse`. The response will contain information for the
2633    ///   interactive auth and the same request needs to be made but this time
2634    ///   with some `auth_data` provided.
2635    ///
2636    /// ```no_run
2637    /// # use matrix_sdk::{
2638    /// #    ruma::{api::client::uiaa, owned_device_id},
2639    /// #    Client, Error, config::SyncSettings,
2640    /// # };
2641    /// # use serde_json::json;
2642    /// # use url::Url;
2643    /// # use std::collections::BTreeMap;
2644    /// # async {
2645    /// # let homeserver = Url::parse("http://localhost:8080")?;
2646    /// # let mut client = Client::new(homeserver).await?;
2647    /// let devices = &[owned_device_id!("DEVICEID")];
2648    ///
2649    /// if let Err(e) = client.delete_devices(devices, None).await {
2650    ///     if let Some(info) = e.as_uiaa_response() {
2651    ///         let mut password = uiaa::Password::new(
2652    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2653    ///             "wordpass".to_owned(),
2654    ///         );
2655    ///         password.session = info.session.clone();
2656    ///
2657    ///         client
2658    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2659    ///             .await?;
2660    ///     }
2661    /// }
2662    /// # anyhow::Ok(()) };
2663    pub async fn delete_devices(
2664        &self,
2665        devices: &[OwnedDeviceId],
2666        auth_data: Option<uiaa::AuthData>,
2667    ) -> HttpResult<delete_devices::v3::Response> {
2668        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2669        request.auth = auth_data;
2670
2671        self.send(request).await
2672    }
2673
2674    /// Change the display name of a device owned by the current user.
2675    ///
2676    /// Returns a `update_device::Response` which specifies the result
2677    /// of the operation.
2678    ///
2679    /// # Arguments
2680    ///
2681    /// * `device_id` - The ID of the device to change the display name of.
2682    /// * `display_name` - The new display name to set.
2683    pub async fn rename_device(
2684        &self,
2685        device_id: &DeviceId,
2686        display_name: &str,
2687    ) -> HttpResult<update_device::v3::Response> {
2688        let mut request = update_device::v3::Request::new(device_id.to_owned());
2689        request.display_name = Some(display_name.to_owned());
2690
2691        self.send(request).await
2692    }
2693
2694    /// Check whether a device with a specific ID exists on the server.
2695    ///
2696    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2697    /// with 404 and the underlying error otherwise.
2698    ///
2699    /// # Arguments
2700    ///
2701    /// * `device_id` - The ID of the device to query.
2702    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2703        let request = device::get_device::v3::Request::new(device_id);
2704        match self.send(request).await {
2705            Ok(_) => Ok(true),
2706            Err(err) => {
2707                if let Some(error) = err.as_client_api_error()
2708                    && error.status_code == 404
2709                {
2710                    Ok(false)
2711                } else {
2712                    Err(err.into())
2713                }
2714            }
2715        }
2716    }
2717
2718    /// Synchronize the client's state with the latest state on the server.
2719    ///
2720    /// ## Syncing Events
2721    ///
2722    /// Messages or any other type of event need to be periodically fetched from
2723    /// the server, this is achieved by sending a `/sync` request to the server.
2724    ///
2725    /// The first sync is sent out without a [`token`]. The response of the
2726    /// first sync will contain a [`next_batch`] field which should then be
2727    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2728    /// don't receive the same events multiple times.
2729    ///
2730    /// ## Long Polling
2731    ///
2732    /// A sync should in the usual case always be in flight. The
2733    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2734    /// long the server will wait for new events before it will respond.
2735    /// The server will respond immediately if some new events arrive before the
2736    /// timeout has expired. If no changes arrive and the timeout expires an
2737    /// empty sync response will be sent to the client.
2738    ///
2739    /// This method of sending a request that may not receive a response
2740    /// immediately is called long polling.
2741    ///
2742    /// ## Filtering Events
2743    ///
2744    /// The number or type of messages and events that the client should receive
2745    /// from the server can be altered using a [`Filter`].
2746    ///
2747    /// Filters can be non-trivial and, since they will be sent with every sync
2748    /// request, they may take up a bunch of unnecessary bandwidth.
2749    ///
2750    /// Luckily filters can be uploaded to the server and reused using an unique
2751    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2752    /// method.
2753    ///
2754    /// # Arguments
2755    ///
2756    /// * `sync_settings` - Settings for the sync call, this allows us to set
2757    /// various options to configure the sync:
2758    ///     * [`filter`] - To configure which events we receive and which get
2759    ///       [filtered] by the server
2760    ///     * [`timeout`] - To configure our [long polling] setup.
2761    ///     * [`token`] - To tell the server which events we already received
2762    ///       and where we wish to continue syncing.
2763    ///     * [`full_state`] - To tell the server that we wish to receive all
2764    ///       state events, regardless of our configured [`token`].
2765    ///     * [`set_presence`] - To tell the server to set the presence and to
2766    ///       which state.
2767    ///
2768    /// # Examples
2769    ///
2770    /// ```no_run
2771    /// # use url::Url;
2772    /// # async {
2773    /// # let homeserver = Url::parse("http://localhost:8080")?;
2774    /// # let username = "";
2775    /// # let password = "";
2776    /// use matrix_sdk::{
2777    ///     Client, config::SyncSettings,
2778    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2779    /// };
2780    ///
2781    /// let client = Client::new(homeserver).await?;
2782    /// client.matrix_auth().login_username(username, password).send().await?;
2783    ///
2784    /// // Sync once so we receive the client state and old messages.
2785    /// client.sync_once(SyncSettings::default()).await?;
2786    ///
2787    /// // Register our handler so we start responding once we receive a new
2788    /// // event.
2789    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2790    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2791    /// });
2792    ///
2793    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2794    /// // from our `sync_once()` call automatically.
2795    /// client.sync(SyncSettings::default()).await;
2796    /// # anyhow::Ok(()) };
2797    /// ```
2798    ///
2799    /// [`sync`]: #method.sync
2800    /// [`SyncSettings`]: crate::config::SyncSettings
2801    /// [`token`]: crate::config::SyncSettings#method.token
2802    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2803    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2804    /// [`set_presence`]: ruma::presence::PresenceState
2805    /// [`filter`]: crate::config::SyncSettings#method.filter
2806    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2807    /// [`next_batch`]: SyncResponse#structfield.next_batch
2808    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2809    /// [long polling]: #long-polling
2810    /// [filtered]: #filtering-events
2811    #[instrument(skip(self))]
2812    pub async fn sync_once(
2813        &self,
2814        sync_settings: crate::config::SyncSettings,
2815    ) -> Result<SyncResponse> {
2816        // The sync might not return for quite a while due to the timeout.
2817        // We'll see if there's anything crypto related to send out before we
2818        // sync, i.e. if we closed our client after a sync but before the
2819        // crypto requests were sent out.
2820        //
2821        // This will mostly be a no-op.
2822        #[cfg(feature = "e2e-encryption")]
2823        if let Err(e) = self.send_outgoing_requests().await {
2824            error!(error = ?e, "Error while sending outgoing E2EE requests");
2825        }
2826
2827        let token = match sync_settings.token {
2828            SyncToken::Specific(token) => Some(token),
2829            SyncToken::NoToken => None,
2830            SyncToken::ReusePrevious => self.sync_token().await,
2831        };
2832
2833        let request = assign!(sync_events::v3::Request::new(), {
2834            filter: sync_settings.filter.map(|f| *f),
2835            since: token,
2836            full_state: sync_settings.full_state,
2837            set_presence: sync_settings.set_presence,
2838            timeout: sync_settings.timeout,
2839            use_state_after: true,
2840        });
2841        let mut request_config = self.request_config();
2842        if let Some(timeout) = sync_settings.timeout {
2843            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2844            request_config.timeout = Some(base_timeout + timeout);
2845        }
2846
2847        let response = self.send(request).with_request_config(request_config).await?;
2848        let next_batch = response.next_batch.clone();
2849        let response = self.process_sync(response).await?;
2850
2851        #[cfg(feature = "e2e-encryption")]
2852        if let Err(e) = self.send_outgoing_requests().await {
2853            error!(error = ?e, "Error while sending outgoing E2EE requests");
2854        }
2855
2856        self.inner.sync_beat.notify(usize::MAX);
2857
2858        Ok(SyncResponse::new(next_batch, response))
2859    }
2860
2861    /// Repeatedly synchronize the client state with the server.
2862    ///
2863    /// This method will only return on error, if cancellation is needed
2864    /// the method should be wrapped in a cancelable task or the
2865    /// [`Client::sync_with_callback`] method can be used or
2866    /// [`Client::sync_with_result_callback`] if you want to handle error
2867    /// cases in the loop, too.
2868    ///
2869    /// This method will internally call [`Client::sync_once`] in a loop.
2870    ///
2871    /// This method can be used with the [`Client::add_event_handler`]
2872    /// method to react to individual events. If you instead wish to handle
2873    /// events in a bulk manner the [`Client::sync_with_callback`],
2874    /// [`Client::sync_with_result_callback`] and
2875    /// [`Client::sync_stream`] methods can be used instead. Those methods
2876    /// repeatedly return the whole sync response.
2877    ///
2878    /// # Arguments
2879    ///
2880    /// * `sync_settings` - Settings for the sync call. *Note* that those
2881    ///   settings will be only used for the first sync call. See the argument
2882    ///   docs for [`Client::sync_once`] for more info.
2883    ///
2884    /// # Return
2885    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2886    /// up to the user of the API to check the error and decide whether the sync
2887    /// should continue or not.
2888    ///
2889    /// # Examples
2890    ///
2891    /// ```no_run
2892    /// # use url::Url;
2893    /// # async {
2894    /// # let homeserver = Url::parse("http://localhost:8080")?;
2895    /// # let username = "";
2896    /// # let password = "";
2897    /// use matrix_sdk::{
2898    ///     Client, config::SyncSettings,
2899    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2900    /// };
2901    ///
2902    /// let client = Client::new(homeserver).await?;
2903    /// client.matrix_auth().login_username(&username, &password).send().await?;
2904    ///
2905    /// // Register our handler so we start responding once we receive a new
2906    /// // event.
2907    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2908    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2909    /// });
2910    ///
2911    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2912    /// // automatically.
2913    /// client.sync(SyncSettings::default()).await?;
2914    /// # anyhow::Ok(()) };
2915    /// ```
2916    ///
2917    /// [argument docs]: #method.sync_once
2918    /// [`sync_with_callback`]: #method.sync_with_callback
2919    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2920        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2921    }
2922
2923    /// Repeatedly call sync to synchronize the client state with the server.
2924    ///
2925    /// # Arguments
2926    ///
2927    /// * `sync_settings` - Settings for the sync call. *Note* that those
2928    ///   settings will be only used for the first sync call. See the argument
2929    ///   docs for [`Client::sync_once`] for more info.
2930    ///
2931    /// * `callback` - A callback that will be called every time a successful
2932    ///   response has been fetched from the server. The callback must return a
2933    ///   boolean which signalizes if the method should stop syncing. If the
2934    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2935    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2936    ///
2937    /// # Return
2938    /// The sync runs until an error occurs or the
2939    /// callback indicates that the Loop should stop. If the callback asked for
2940    /// a regular stop, the result will be `Ok(())` otherwise the
2941    /// `Err(Error)` is returned.
2942    ///
2943    /// # Examples
2944    ///
2945    /// The following example demonstrates how to sync forever while sending all
2946    /// the interesting events through a mpsc channel to another thread e.g. a
2947    /// UI thread.
2948    ///
2949    /// ```no_run
2950    /// # use std::time::Duration;
2951    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2952    /// # use url::Url;
2953    /// # async {
2954    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2955    /// # let mut client = Client::new(homeserver).await.unwrap();
2956    ///
2957    /// use tokio::sync::mpsc::channel;
2958    ///
2959    /// let (tx, rx) = channel(100);
2960    ///
2961    /// let sync_channel = &tx;
2962    /// let sync_settings = SyncSettings::new()
2963    ///     .timeout(Duration::from_secs(30));
2964    ///
2965    /// client
2966    ///     .sync_with_callback(sync_settings, |response| async move {
2967    ///         let channel = sync_channel;
2968    ///         for (room_id, room) in response.rooms.joined {
2969    ///             for event in room.timeline.events {
2970    ///                 channel.send(event).await.unwrap();
2971    ///             }
2972    ///         }
2973    ///
2974    ///         LoopCtrl::Continue
2975    ///     })
2976    ///     .await;
2977    /// };
2978    /// ```
2979    #[instrument(skip_all)]
2980    pub async fn sync_with_callback<C>(
2981        &self,
2982        sync_settings: crate::config::SyncSettings,
2983        callback: impl Fn(SyncResponse) -> C,
2984    ) -> Result<(), Error>
2985    where
2986        C: Future<Output = LoopCtrl>,
2987    {
2988        self.sync_with_result_callback(sync_settings, |result| async {
2989            Ok(callback(result?).await)
2990        })
2991        .await
2992    }
2993
2994    /// Repeatedly call sync to synchronize the client state with the server.
2995    ///
2996    /// # Arguments
2997    ///
2998    /// * `sync_settings` - Settings for the sync call. *Note* that those
2999    ///   settings will be only used for the first sync call. See the argument
3000    ///   docs for [`Client::sync_once`] for more info.
3001    ///
3002    /// * `callback` - A callback that will be called every time after a
3003    ///   response has been received, failure or not. The callback returns a
3004    ///   `Result<LoopCtrl, Error>`, too. When returning
3005    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3006    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3007    ///   function returns `Ok(())`. In case the callback can't handle the
3008    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
3009    ///   which results in the sync ending and the `Err(Error)` being returned.
3010    ///
3011    /// # Return
3012    /// The sync runs until an error occurs that the callback can't handle or
3013    /// the callback indicates that the Loop should stop. If the callback
3014    /// asked for a regular stop, the result will be `Ok(())` otherwise the
3015    /// `Err(Error)` is returned.
3016    ///
3017    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3018    /// this, and are handled first without sending the result to the
3019    /// callback. Only after they have exceeded is the `Result` handed to
3020    /// the callback.
3021    ///
3022    /// # Examples
3023    ///
3024    /// The following example demonstrates how to sync forever while sending all
3025    /// the interesting events through a mpsc channel to another thread e.g. a
3026    /// UI thread.
3027    ///
3028    /// ```no_run
3029    /// # use std::time::Duration;
3030    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3031    /// # use url::Url;
3032    /// # async {
3033    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3034    /// # let mut client = Client::new(homeserver).await.unwrap();
3035    /// #
3036    /// use tokio::sync::mpsc::channel;
3037    ///
3038    /// let (tx, rx) = channel(100);
3039    ///
3040    /// let sync_channel = &tx;
3041    /// let sync_settings = SyncSettings::new()
3042    ///     .timeout(Duration::from_secs(30));
3043    ///
3044    /// client
3045    ///     .sync_with_result_callback(sync_settings, |response| async move {
3046    ///         let channel = sync_channel;
3047    ///         let sync_response = response?;
3048    ///         for (room_id, room) in sync_response.rooms.joined {
3049    ///              for event in room.timeline.events {
3050    ///                  channel.send(event).await.unwrap();
3051    ///               }
3052    ///         }
3053    ///
3054    ///         Ok(LoopCtrl::Continue)
3055    ///     })
3056    ///     .await;
3057    /// };
3058    /// ```
3059    #[instrument(skip(self, callback))]
3060    pub async fn sync_with_result_callback<C>(
3061        &self,
3062        sync_settings: crate::config::SyncSettings,
3063        callback: impl Fn(Result<SyncResponse, Error>) -> C,
3064    ) -> Result<(), Error>
3065    where
3066        C: Future<Output = Result<LoopCtrl, Error>>,
3067    {
3068        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3069
3070        while let Some(result) = sync_stream.next().await {
3071            trace!("Running callback");
3072            if callback(result).await? == LoopCtrl::Break {
3073                trace!("Callback told us to stop");
3074                break;
3075            }
3076            trace!("Done running callback");
3077        }
3078
3079        Ok(())
3080    }
3081
3082    //// Repeatedly synchronize the client state with the server.
3083    ///
3084    /// This method will internally call [`Client::sync_once`] in a loop and is
3085    /// equivalent to the [`Client::sync`] method but the responses are provided
3086    /// as an async stream.
3087    ///
3088    /// # Arguments
3089    ///
3090    /// * `sync_settings` - Settings for the sync call. *Note* that those
3091    ///   settings will be only used for the first sync call. See the argument
3092    ///   docs for [`Client::sync_once`] for more info.
3093    ///
3094    /// # Examples
3095    ///
3096    /// ```no_run
3097    /// # use url::Url;
3098    /// # async {
3099    /// # let homeserver = Url::parse("http://localhost:8080")?;
3100    /// # let username = "";
3101    /// # let password = "";
3102    /// use futures_util::StreamExt;
3103    /// use matrix_sdk::{Client, config::SyncSettings};
3104    ///
3105    /// let client = Client::new(homeserver).await?;
3106    /// client.matrix_auth().login_username(&username, &password).send().await?;
3107    ///
3108    /// let mut sync_stream =
3109    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
3110    ///
3111    /// while let Some(Ok(response)) = sync_stream.next().await {
3112    ///     for room in response.rooms.joined.values() {
3113    ///         for e in &room.timeline.events {
3114    ///             if let Ok(event) = e.raw().deserialize() {
3115    ///                 println!("Received event {:?}", event);
3116    ///             }
3117    ///         }
3118    ///     }
3119    /// }
3120    ///
3121    /// # anyhow::Ok(()) };
3122    /// ```
3123    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3124    #[instrument(skip(self))]
3125    pub async fn sync_stream(
3126        &self,
3127        mut sync_settings: crate::config::SyncSettings,
3128    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3129        let mut is_first_sync = true;
3130        let mut timeout = None;
3131        let mut last_sync_time: Option<Instant> = None;
3132
3133        let parent_span = Span::current();
3134
3135        async_stream::stream!({
3136            loop {
3137                trace!("Syncing");
3138
3139                if sync_settings.ignore_timeout_on_first_sync {
3140                    if is_first_sync {
3141                        timeout = sync_settings.timeout.take();
3142                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3143                        sync_settings.timeout = timeout.take();
3144                    }
3145
3146                    is_first_sync = false;
3147                }
3148
3149                yield self
3150                    .sync_loop_helper(&mut sync_settings)
3151                    .instrument(parent_span.clone())
3152                    .await;
3153
3154                Client::delay_sync(&mut last_sync_time).await
3155            }
3156        })
3157    }
3158
3159    /// Get the current, if any, sync token of the client.
3160    /// This will be None if the client didn't sync at least once.
3161    pub(crate) async fn sync_token(&self) -> Option<String> {
3162        self.inner.base_client.sync_token().await
3163    }
3164
3165    /// Gets information about the owner of a given access token.
3166    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3167        let request = whoami::v3::Request::new();
3168        self.send(request).await
3169    }
3170
3171    /// Subscribes a new receiver to client SessionChange broadcasts.
3172    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3173        let broadcast = &self.auth_ctx().session_change_sender;
3174        broadcast.subscribe()
3175    }
3176
3177    /// Sets the save/restore session callbacks.
3178    ///
3179    /// This is another mechanism to get synchronous updates to session tokens,
3180    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3181    pub fn set_session_callbacks(
3182        &self,
3183        reload_session_callback: Box<ReloadSessionCallback>,
3184        save_session_callback: Box<SaveSessionCallback>,
3185    ) -> Result<()> {
3186        self.inner
3187            .auth_ctx
3188            .reload_session_callback
3189            .set(reload_session_callback)
3190            .map_err(|_| Error::MultipleSessionCallbacks)?;
3191
3192        self.inner
3193            .auth_ctx
3194            .save_session_callback
3195            .set(save_session_callback)
3196            .map_err(|_| Error::MultipleSessionCallbacks)?;
3197
3198        Ok(())
3199    }
3200
3201    /// Get the notification settings of the current owner of the client.
3202    pub async fn notification_settings(&self) -> NotificationSettings {
3203        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3204        NotificationSettings::new(self.clone(), ruleset)
3205    }
3206
3207    /// Create a new specialized `Client` that can process notifications.
3208    ///
3209    /// See [`CrossProcessLock::new`] to learn more about
3210    /// `cross_process_lock_config`.
3211    ///
3212    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3213    pub async fn notification_client(
3214        &self,
3215        cross_process_lock_config: CrossProcessLockConfig,
3216    ) -> Result<Client> {
3217        let client = Client {
3218            inner: ClientInner::new(
3219                self.inner.auth_ctx.clone(),
3220                self.server().cloned(),
3221                self.homeserver(),
3222                self.sliding_sync_version(),
3223                self.inner.http_client.clone(),
3224                self.inner
3225                    .base_client
3226                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3227                    .await?,
3228                self.inner.caches.supported_versions.value(),
3229                self.inner.caches.well_known.value(),
3230                self.inner.respect_login_well_known,
3231                self.inner.event_cache.clone(),
3232                self.inner.send_queue_data.clone(),
3233                self.inner.latest_events.clone(),
3234                #[cfg(feature = "e2e-encryption")]
3235                self.inner.e2ee.encryption_settings,
3236                #[cfg(feature = "e2e-encryption")]
3237                self.inner.enable_share_history_on_invite,
3238                cross_process_lock_config,
3239                #[cfg(feature = "experimental-search")]
3240                self.inner.search_index.clone(),
3241                self.inner.thread_subscription_catchup.clone(),
3242            )
3243            .await,
3244        };
3245
3246        Ok(client)
3247    }
3248
3249    /// The [`EventCache`] instance for this [`Client`].
3250    pub fn event_cache(&self) -> &EventCache {
3251        // SAFETY: always initialized in the `Client` ctor.
3252        self.inner.event_cache.get().unwrap()
3253    }
3254
3255    /// The [`LatestEvents`] instance for this [`Client`].
3256    pub async fn latest_events(&self) -> &LatestEvents {
3257        self.inner
3258            .latest_events
3259            .get_or_init(|| async {
3260                LatestEvents::new(
3261                    WeakClient::from_client(self),
3262                    self.event_cache().clone(),
3263                    SendQueue::new(self.clone()),
3264                    self.room_info_notable_update_receiver(),
3265                )
3266            })
3267            .await
3268    }
3269
3270    /// Waits until an at least partially synced room is received, and returns
3271    /// it.
3272    ///
3273    /// **Note: this function will loop endlessly until either it finds the room
3274    /// or an externally set timeout happens.**
3275    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3276        loop {
3277            if let Some(room) = self.get_room(room_id) {
3278                if room.is_state_partially_or_fully_synced() {
3279                    debug!("Found just created room!");
3280                    return room;
3281                }
3282                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3283            } else {
3284                debug!("Room wasn't found, waiting for sync beat to try again");
3285            }
3286            self.inner.sync_beat.listen().await;
3287        }
3288    }
3289
3290    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3291    /// join it.
3292    pub async fn knock(
3293        &self,
3294        room_id_or_alias: OwnedRoomOrAliasId,
3295        reason: Option<String>,
3296        server_names: Vec<OwnedServerName>,
3297    ) -> Result<Room> {
3298        let request =
3299            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3300        let response = self.send(request).await?;
3301        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3302        Ok(Room::new(self.clone(), base_room))
3303    }
3304
3305    /// Checks whether the provided `user_id` belongs to an ignored user.
3306    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3307        self.base_client().is_user_ignored(user_id).await
3308    }
3309
3310    /// Gets the `max_upload_size` value from the homeserver, getting either a
3311    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3312    /// missing.
3313    ///
3314    /// Check the spec for more info:
3315    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3316    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3317        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3318        if let Some(data) = max_upload_size_lock.get() {
3319            return Ok(data.to_owned());
3320        }
3321
3322        // Use the authenticated endpoint when the server supports it.
3323        let supported_versions = self.supported_versions().await?;
3324        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3325            .is_supported(&supported_versions);
3326
3327        let upload_size = if use_auth {
3328            self.send(authenticated_media::get_media_config::v1::Request::default())
3329                .await?
3330                .upload_size
3331        } else {
3332            #[allow(deprecated)]
3333            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3334        };
3335
3336        match max_upload_size_lock.set(upload_size) {
3337            Ok(_) => Ok(upload_size),
3338            Err(error) => {
3339                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3340            }
3341        }
3342    }
3343
3344    /// The settings to use for decrypting events.
3345    #[cfg(feature = "e2e-encryption")]
3346    pub fn decryption_settings(&self) -> &DecryptionSettings {
3347        &self.base_client().decryption_settings
3348    }
3349
3350    /// Returns the [`SearchIndex`] for this [`Client`].
3351    #[cfg(feature = "experimental-search")]
3352    pub fn search_index(&self) -> &SearchIndex {
3353        &self.inner.search_index
3354    }
3355
3356    /// Whether the client is configured to take thread subscriptions (MSC4306
3357    /// and MSC4308) into account, and the server enabled the experimental
3358    /// feature flag for it.
3359    ///
3360    /// This may cause filtering out of thread subscriptions, and loading the
3361    /// thread subscriptions via the sliding sync extension, when the room
3362    /// list service is being used.
3363    ///
3364    /// This is async and fallible as it may use the network to retrieve the
3365    /// server supported features, if they aren't cached already.
3366    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3367        // Check if the client is configured to support thread subscriptions first.
3368        match self.base_client().threading_support {
3369            ThreadingSupport::Enabled { with_subscriptions: false }
3370            | ThreadingSupport::Disabled => return Ok(false),
3371            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3372        }
3373
3374        // Now, let's check that the server supports it.
3375        let server_enabled = self
3376            .supported_versions()
3377            .await?
3378            .features
3379            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3380
3381        Ok(server_enabled)
3382    }
3383
3384    /// Fetch thread subscriptions changes between `from` and up to `to`.
3385    ///
3386    /// The `limit` optional parameter can be used to limit the number of
3387    /// entries in a response. It can also be overridden by the server, if
3388    /// it's deemed too large.
3389    pub async fn fetch_thread_subscriptions(
3390        &self,
3391        from: Option<String>,
3392        to: Option<String>,
3393        limit: Option<UInt>,
3394    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3395        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3396            from,
3397            to,
3398            limit,
3399        });
3400        Ok(self.send(request).await?)
3401    }
3402
3403    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3404        self.inner.thread_subscription_catchup.get().unwrap()
3405    }
3406
3407    /// Perform database optimizations if any are available, i.e. vacuuming in
3408    /// SQLite.
3409    ///
3410    /// **Warning:** this was added to check if SQLite fragmentation was the
3411    /// source of performance issues, **DO NOT use in production**.
3412    #[doc(hidden)]
3413    pub async fn optimize_stores(&self) -> Result<()> {
3414        trace!("Optimizing state store...");
3415        self.state_store().optimize().await?;
3416
3417        trace!("Optimizing event cache store...");
3418        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3419            clean_lock.optimize().await?;
3420        }
3421
3422        trace!("Optimizing media store...");
3423        self.media_store().lock().await?.optimize().await?;
3424
3425        Ok(())
3426    }
3427
3428    /// Returns the sizes of the existing stores, if known.
3429    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3430        #[cfg(feature = "e2e-encryption")]
3431        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3432            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3433        {
3434            Some(store_size)
3435        } else {
3436            None
3437        };
3438        #[cfg(not(feature = "e2e-encryption"))]
3439        let crypto_store_size = None;
3440
3441        let state_store_size = self.state_store().get_size().await.ok().flatten();
3442
3443        let event_cache_store_size = if let Some(clean_lock) =
3444            self.event_cache_store().lock().await?.as_clean()
3445            && let Ok(Some(store_size)) = clean_lock.get_size().await
3446        {
3447            Some(store_size)
3448        } else {
3449            None
3450        };
3451
3452        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3453
3454        Ok(StoreSizes {
3455            crypto_store: crypto_store_size,
3456            state_store: state_store_size,
3457            event_cache_store: event_cache_store_size,
3458            media_store: media_store_size,
3459        })
3460    }
3461
3462    /// Get a reference to the client's task monitor, for spawning background
3463    /// tasks.
3464    pub fn task_monitor(&self) -> &TaskMonitor {
3465        &self.inner.task_monitor
3466    }
3467
3468    /// Add a subscriber for duplicate key upload error notifications triggered
3469    /// by requests to /keys/upload.
3470    #[cfg(feature = "e2e-encryption")]
3471    pub fn subscribe_to_duplicate_key_upload_errors(
3472        &self,
3473    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3474        self.inner.duplicate_key_upload_error_sender.subscribe()
3475    }
3476
3477    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3478    /// for the given room.
3479    ///
3480    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3481    #[cfg(feature = "e2e-encryption")]
3482    pub async fn get_pending_key_bundle_details_for_room(
3483        &self,
3484        room_id: &RoomId,
3485    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3486        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3487    }
3488
3489    /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3490    /// a DM.
3491    pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3492        &self.inner.base_client.dm_room_definition
3493    }
3494}
3495
3496/// Contains the disk size of the different stores, if known. It won't be
3497/// available for in-memory stores.
3498#[derive(Debug, Clone)]
3499pub struct StoreSizes {
3500    /// The size of the CryptoStore.
3501    pub crypto_store: Option<usize>,
3502    /// The size of the StateStore.
3503    pub state_store: Option<usize>,
3504    /// The size of the EventCacheStore.
3505    pub event_cache_store: Option<usize>,
3506    /// The size of the MediaStore.
3507    pub media_store: Option<usize>,
3508}
3509
3510#[cfg(any(feature = "testing", test))]
3511impl Client {
3512    /// Test helper to mark users as tracked by the crypto layer.
3513    #[cfg(feature = "e2e-encryption")]
3514    pub async fn update_tracked_users_for_testing(
3515        &self,
3516        user_ids: impl IntoIterator<Item = &UserId>,
3517    ) {
3518        let olm = self.olm_machine().await;
3519        let olm = olm.as_ref().unwrap();
3520        olm.update_tracked_users(user_ids).await.unwrap();
3521    }
3522}
3523
3524/// A weak reference to the inner client, useful when trying to get a handle
3525/// on the owning client.
3526#[derive(Clone, Debug)]
3527pub(crate) struct WeakClient {
3528    client: Weak<ClientInner>,
3529}
3530
3531impl WeakClient {
3532    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3533    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3534        Self { client: Arc::downgrade(client) }
3535    }
3536
3537    /// Construct a [`WeakClient`] from a [`Client`].
3538    pub fn from_client(client: &Client) -> Self {
3539        Self::from_inner(&client.inner)
3540    }
3541
3542    /// Attempts to get a [`Client`] from this [`WeakClient`].
3543    pub fn get(&self) -> Option<Client> {
3544        self.client.upgrade().map(|inner| Client { inner })
3545    }
3546
3547    /// Gets the number of strong (`Arc`) pointers still pointing to this
3548    /// client.
3549    #[allow(dead_code)]
3550    pub fn strong_count(&self) -> usize {
3551        self.client.strong_count()
3552    }
3553}
3554
3555/// Information about the state of a room before we joined it.
3556#[derive(Debug, Clone, Default)]
3557struct PreJoinRoomInfo {
3558    /// The user who invited us to the room, if any.
3559    pub inviter: Option<RoomMember>,
3560}
3561
3562// The http mocking library is not supported for wasm32
3563#[cfg(all(test, not(target_family = "wasm")))]
3564pub(crate) mod tests {
3565    use std::{sync::Arc, time::Duration};
3566
3567    use assert_matches::assert_matches;
3568    use assert_matches2::assert_let;
3569    use eyeball::SharedObservable;
3570    use futures_util::{FutureExt, StreamExt, pin_mut};
3571    use js_int::{UInt, uint};
3572    use matrix_sdk_base::{
3573        RoomState,
3574        store::{MemoryStore, StoreConfig},
3575        ttl::TtlValue,
3576    };
3577    use matrix_sdk_test::{
3578        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3579        event_factory::EventFactory,
3580    };
3581    #[cfg(target_family = "wasm")]
3582    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3583
3584    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3585    use ruma::{
3586        RoomId, ServerName, UserId,
3587        api::{
3588            FeatureFlag, MatrixVersion,
3589            client::{
3590                discovery::discover_homeserver::RtcFocusInfo,
3591                room::create_room::v3::Request as CreateRoomRequest,
3592            },
3593        },
3594        assign,
3595        events::{
3596            ignored_user_list::IgnoredUserListEventContent,
3597            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3598        },
3599        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3600    };
3601    use serde_json::json;
3602    use stream_assert::{assert_next_matches, assert_pending};
3603    use tokio::{
3604        spawn,
3605        time::{sleep, timeout},
3606    };
3607    use url::Url;
3608
3609    use super::Client;
3610    use crate::{
3611        Error, TransmissionProgress,
3612        client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3613        config::{RequestConfig, SyncSettings},
3614        futures::SendRequest,
3615        media::MediaError,
3616        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3617    };
3618
3619    #[async_test]
3620    async fn test_account_data() {
3621        let server = MatrixMockServer::new().await;
3622        let client = server.client_builder().build().await;
3623
3624        let f = EventFactory::new();
3625        server
3626            .mock_sync()
3627            .ok_and_run(&client, |builder| {
3628                builder.add_global_account_data(
3629                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3630                );
3631            })
3632            .await;
3633
3634        let content = client
3635            .account()
3636            .account_data::<IgnoredUserListEventContent>()
3637            .await
3638            .unwrap()
3639            .unwrap()
3640            .deserialize()
3641            .unwrap();
3642
3643        assert_eq!(content.ignored_users.len(), 1);
3644    }
3645
3646    #[async_test]
3647    async fn test_successful_discovery() {
3648        // Imagine this is `matrix.org`.
3649        let server = MatrixMockServer::new().await;
3650        let server_url = server.uri();
3651
3652        // Imagine this is `matrix-client.matrix.org`.
3653        let homeserver = MatrixMockServer::new().await;
3654        let homeserver_url = homeserver.uri();
3655
3656        // Imagine Alice has the user ID `@alice:matrix.org`.
3657        let domain = server_url.strip_prefix("http://").unwrap();
3658        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3659
3660        // The `.well-known` is on the server (e.g. `matrix.org`).
3661        server
3662            .mock_well_known()
3663            .ok_with_homeserver_url(&homeserver_url)
3664            .mock_once()
3665            .named("well-known")
3666            .mount()
3667            .await;
3668
3669        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3670        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3671
3672        let client = Client::builder()
3673            .insecure_server_name_no_tls(alice.server_name())
3674            .build()
3675            .await
3676            .unwrap();
3677
3678        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3679        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3680        client.server_versions().await.unwrap();
3681    }
3682
3683    #[async_test]
3684    async fn test_discovery_broken_server() {
3685        let server = MatrixMockServer::new().await;
3686        let server_url = server.uri();
3687        let domain = server_url.strip_prefix("http://").unwrap();
3688        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3689
3690        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3691
3692        assert!(
3693            Client::builder()
3694                .insecure_server_name_no_tls(alice.server_name())
3695                .build()
3696                .await
3697                .is_err(),
3698            "Creating a client from a user ID should fail when the .well-known request fails."
3699        );
3700    }
3701
3702    #[async_test]
3703    async fn test_room_creation() {
3704        let server = MatrixMockServer::new().await;
3705        let client = server.client_builder().build().await;
3706
3707        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3708        server
3709            .mock_sync()
3710            .ok_and_run(&client, |builder| {
3711                builder.add_joined_room(
3712                    JoinedRoomBuilder::default()
3713                        .add_state_event(
3714                            f.member(user_id!("@example:localhost")).display_name("example"),
3715                        )
3716                        .add_state_event(f.default_power_levels()),
3717                );
3718            })
3719            .await;
3720
3721        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3722        assert_eq!(room.state(), RoomState::Joined);
3723    }
3724
3725    #[async_test]
3726    async fn test_retry_limit_http_requests() {
3727        let server = MatrixMockServer::new().await;
3728        let client = server
3729            .client_builder()
3730            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3731            .build()
3732            .await;
3733
3734        assert!(client.request_config().retry_limit.unwrap() == 4);
3735
3736        server.mock_who_am_i().error500().expect(4).mount().await;
3737
3738        client.whoami().await.unwrap_err();
3739    }
3740
3741    #[async_test]
3742    async fn test_retry_timeout_http_requests() {
3743        // Keep this timeout small so that the test doesn't take long
3744        let retry_timeout = Duration::from_secs(5);
3745        let server = MatrixMockServer::new().await;
3746        let client = server
3747            .client_builder()
3748            .on_builder(|builder| {
3749                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3750            })
3751            .build()
3752            .await;
3753
3754        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3755
3756        server.mock_login().error500().expect(2..).mount().await;
3757
3758        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3759    }
3760
3761    #[async_test]
3762    async fn test_short_retry_initial_http_requests() {
3763        let server = MatrixMockServer::new().await;
3764        let client = server
3765            .client_builder()
3766            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3767            .build()
3768            .await;
3769
3770        server.mock_login().error500().expect(3..).mount().await;
3771
3772        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3773    }
3774
3775    #[async_test]
3776    async fn test_no_retry_http_requests() {
3777        let server = MatrixMockServer::new().await;
3778        let client = server.client_builder().build().await;
3779
3780        server.mock_devices().error500().mock_once().mount().await;
3781
3782        client.devices().await.unwrap_err();
3783    }
3784
3785    #[async_test]
3786    async fn test_set_homeserver() {
3787        let client = MockClientBuilder::new(None).build().await;
3788        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3789
3790        let homeserver = Url::parse("http://example.com/").unwrap();
3791        client.set_homeserver(homeserver.clone());
3792        assert_eq!(client.homeserver(), homeserver);
3793    }
3794
3795    #[async_test]
3796    async fn test_search_user_request() {
3797        let server = MatrixMockServer::new().await;
3798        let client = server.client_builder().build().await;
3799
3800        server.mock_user_directory().ok().mock_once().mount().await;
3801
3802        let response = client.search_users("test", 50).await.unwrap();
3803        assert_eq!(response.results.len(), 1);
3804        let result = &response.results[0];
3805        assert_eq!(result.user_id.to_string(), "@test:example.me");
3806        assert_eq!(result.display_name.clone().unwrap(), "Test");
3807        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3808        assert!(!response.limited);
3809    }
3810
3811    #[async_test]
3812    async fn test_request_unstable_features() {
3813        let server = MatrixMockServer::new().await;
3814        let client = server.client_builder().no_server_versions().build().await;
3815
3816        server
3817            .mock_versions()
3818            .with_feature("org.matrix.e2e_cross_signing", true)
3819            .ok()
3820            .mock_once()
3821            .mount()
3822            .await;
3823
3824        let unstable_features = client.unstable_features().await.unwrap();
3825        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3826        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3827    }
3828
3829    #[async_test]
3830    async fn test_can_homeserver_push_encrypted_event_to_device() {
3831        let server = MatrixMockServer::new().await;
3832        let client = server.client_builder().no_server_versions().build().await;
3833
3834        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3835
3836        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3837        assert!(msc4028_enabled);
3838    }
3839
3840    #[async_test]
3841    async fn test_recently_visited_rooms() {
3842        // Tracking recently visited rooms requires authentication
3843        let client = MockClientBuilder::new(None).unlogged().build().await;
3844        assert_matches!(
3845            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3846            Err(Error::AuthenticationRequired)
3847        );
3848
3849        let client = MockClientBuilder::new(None).build().await;
3850        let account = client.account();
3851
3852        // We should start off with an empty list
3853        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3854
3855        // Tracking a valid room id should add it to the list
3856        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3857        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3858        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3859
3860        // And the existing list shouldn't be changed
3861        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3862        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3863
3864        // Tracking the same room again shouldn't change the list
3865        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3866        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3867        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3868
3869        // Tracking a second room should add it to the front of the list
3870        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3871        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3872        assert_eq!(
3873            account.get_recently_visited_rooms().await.unwrap(),
3874            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3875        );
3876
3877        // Tracking the first room yet again should move it to the front of the list
3878        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3879        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3880        assert_eq!(
3881            account.get_recently_visited_rooms().await.unwrap(),
3882            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3883        );
3884
3885        // Tracking should be capped at 20
3886        for n in 0..20 {
3887            account
3888                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3889                .await
3890                .unwrap();
3891        }
3892
3893        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3894
3895        // And the initial rooms should've been pushed out
3896        let rooms = account.get_recently_visited_rooms().await.unwrap();
3897        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3898        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3899
3900        // And the last tracked room should be the first
3901        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3902    }
3903
3904    #[async_test]
3905    async fn test_client_no_cycle_with_event_cache() {
3906        let client = MockClientBuilder::new(None).build().await;
3907
3908        // Wait for the init tasks to die.
3909        sleep(Duration::from_secs(1)).await;
3910
3911        let weak_client = WeakClient::from_client(&client);
3912        assert_eq!(weak_client.strong_count(), 1);
3913
3914        {
3915            let room_id = room_id!("!room:example.org");
3916
3917            // Have the client know the room.
3918            let response = SyncResponseBuilder::default()
3919                .add_joined_room(JoinedRoomBuilder::new(room_id))
3920                .build_sync_response();
3921            client.inner.base_client.receive_sync_response(response).await.unwrap();
3922
3923            client.event_cache().subscribe().unwrap();
3924
3925            let (_room_event_cache, _drop_handles) =
3926                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3927        }
3928
3929        drop(client);
3930
3931        // Give a bit of time for background tasks to die.
3932        sleep(Duration::from_secs(1)).await;
3933
3934        // The weak client must be the last reference to the client now.
3935        assert_eq!(weak_client.strong_count(), 0);
3936        let client = weak_client.get();
3937        assert!(
3938            client.is_none(),
3939            "too many strong references to the client: {}",
3940            Arc::strong_count(&client.unwrap().inner)
3941        );
3942    }
3943
3944    #[async_test]
3945    async fn test_supported_versions_caching() {
3946        let server = MatrixMockServer::new().await;
3947
3948        let versions_mock = server
3949            .mock_versions()
3950            .expect_default_access_token()
3951            .with_feature("org.matrix.e2e_cross_signing", true)
3952            .ok()
3953            .named("first versions mock")
3954            .expect(1)
3955            .mount_as_scoped()
3956            .await;
3957
3958        let memory_store = Arc::new(MemoryStore::new());
3959        let client = server
3960            .client_builder()
3961            .no_server_versions()
3962            .on_builder(|builder| {
3963                builder.store_config(
3964                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3965                        .state_store(memory_store.clone()),
3966                )
3967            })
3968            .build()
3969            .await;
3970
3971        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3972
3973        // The result was cached.
3974        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3975        // This subsequent call hits the in-memory cache.
3976        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3977
3978        drop(client);
3979
3980        let client = server
3981            .client_builder()
3982            .no_server_versions()
3983            .on_builder(|builder| {
3984                builder.store_config(
3985                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3986                        .state_store(memory_store.clone()),
3987                )
3988            })
3989            .build()
3990            .await;
3991
3992        // These calls to the new client hit the on-disk cache.
3993        assert!(
3994            client
3995                .unstable_features()
3996                .await
3997                .unwrap()
3998                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3999        );
4000
4001        let supported = client.supported_versions().await.unwrap();
4002        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4003        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4004
4005        // Then this call hits the in-memory cache.
4006        let supported = client.supported_versions().await.unwrap();
4007        assert!(supported.versions.contains(&MatrixVersion::V1_0));
4008        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4009
4010        drop(versions_mock);
4011
4012        // Now, reset the cache, and observe the endpoint being called again once.
4013        client.reset_supported_versions().await.unwrap();
4014
4015        server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4016
4017        // Hits network again.
4018        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4019        // Hits in-memory cache again.
4020        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4021        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4022
4023        // Force an expiry of the data.
4024        let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4025        let mut ttl_value = TtlValue::new(supported_versions);
4026        ttl_value.expire();
4027        client.inner.caches.supported_versions.set_value(ttl_value);
4028
4029        // Call the method to trigger a cache refresh background task.
4030        client.supported_versions_cached().await.unwrap().unwrap();
4031
4032        // We wait for the task to finish, the endpoint should have been called again.
4033        sleep(Duration::from_secs(1)).await;
4034        assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4035    }
4036
4037    #[async_test]
4038    async fn test_well_known_caching() {
4039        let server = MatrixMockServer::new().await;
4040        let server_url = server.uri();
4041        let domain = server_url.strip_prefix("http://").unwrap();
4042        let server_name = <&ServerName>::try_from(domain).unwrap();
4043        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4044
4045        let well_known_mock = server
4046            .mock_well_known()
4047            .ok()
4048            .named("well known mock")
4049            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4050            .mount_as_scoped()
4051            .await;
4052
4053        let memory_store = Arc::new(MemoryStore::new());
4054        let client = Client::builder()
4055            .insecure_server_name_no_tls(server_name)
4056            .store_config(
4057                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4058                    .state_store(memory_store.clone()),
4059            )
4060            .build()
4061            .await
4062            .unwrap();
4063
4064        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4065
4066        // This subsequent call hits the in-memory cache.
4067        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4068
4069        drop(client);
4070
4071        let client = server
4072            .client_builder()
4073            .no_server_versions()
4074            .on_builder(|builder| {
4075                builder.store_config(
4076                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4077                        .state_store(memory_store.clone()),
4078                )
4079            })
4080            .build()
4081            .await;
4082
4083        // This call to the new client hits the on-disk cache.
4084        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4085
4086        // Then this call hits the in-memory cache.
4087        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4088
4089        drop(well_known_mock);
4090
4091        // Now, reset the cache, and observe the endpoints being called again once.
4092        client.reset_well_known().await.unwrap();
4093
4094        server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4095
4096        // Hits network again.
4097        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4098        // Hits in-memory cache again.
4099        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4100
4101        // Force an expiry of the data.
4102        let well_known = client.well_known().await;
4103        let mut ttl_value = TtlValue::new(well_known);
4104        ttl_value.expire();
4105        client.inner.caches.well_known.set_value(ttl_value);
4106
4107        // Call the method again to trigger a cache refresh background task.
4108        client.well_known().await;
4109
4110        // We wait for the task to finish, the endpoint should have been called again.
4111        // We need to wait a bit because the first requests using the server name of the
4112        // user will fail, only the requests using the homeserver URL will succeed.
4113        sleep(Duration::from_secs(5)).await;
4114        assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4115    }
4116
4117    #[async_test]
4118    async fn test_missing_well_known_caching() {
4119        let server = MatrixMockServer::new().await;
4120        let rtc_foci: Vec<RtcFocusInfo> = vec![];
4121
4122        let well_known_mock = server
4123            .mock_well_known()
4124            .error_unrecognized()
4125            .named("first well-known mock")
4126            .expect(1)
4127            .mount_as_scoped()
4128            .await;
4129
4130        let memory_store = Arc::new(MemoryStore::new());
4131        let client = server
4132            .client_builder()
4133            .on_builder(|builder| {
4134                builder.store_config(
4135                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4136                        .state_store(memory_store.clone()),
4137                )
4138            })
4139            .build()
4140            .await;
4141
4142        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4143
4144        // This subsequent call hits the in-memory cache.
4145        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146
4147        drop(client);
4148
4149        let client = server
4150            .client_builder()
4151            .on_builder(|builder| {
4152                builder.store_config(
4153                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4154                        .state_store(memory_store.clone()),
4155                )
4156            })
4157            .build()
4158            .await;
4159
4160        // This call to the new client hits the on-disk cache.
4161        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4162
4163        // Then this call hits the in-memory cache.
4164        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4165
4166        drop(well_known_mock);
4167
4168        // Now, reset the cache, and observe the endpoints being called again once.
4169        client.reset_well_known().await.unwrap();
4170
4171        server
4172            .mock_well_known()
4173            .error_unrecognized()
4174            .expect(1)
4175            .named("second well-known mock")
4176            .mount()
4177            .await;
4178
4179        // Hits network again.
4180        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4181        // Hits in-memory cache again.
4182        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4183    }
4184
4185    #[async_test]
4186    async fn test_no_network_doesnt_cause_infinite_retries() {
4187        // We want infinite retries for transient errors.
4188        let client = MockClientBuilder::new(None)
4189            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4190            .build()
4191            .await;
4192
4193        // We don't define a mock server on purpose here, so that the error is really a
4194        // network error.
4195        client.whoami().await.unwrap_err();
4196    }
4197
4198    #[async_test]
4199    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4200        let server = MatrixMockServer::new().await;
4201        let client = server.client_builder().build().await;
4202
4203        let room_id = room_id!("!room:example.org");
4204
4205        server
4206            .mock_sync()
4207            .ok_and_run(&client, |builder| {
4208                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4209            })
4210            .await;
4211
4212        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4213        assert_eq!(room.room_id(), room_id);
4214    }
4215
4216    #[async_test]
4217    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4218        let server = MatrixMockServer::new().await;
4219        let client = server.client_builder().build().await;
4220
4221        let room_id = room_id!("!room:example.org");
4222
4223        let client = Arc::new(client);
4224
4225        // Perform the /sync request with a delay so it starts after the
4226        // `await_room_remote_echo` call has happened
4227        spawn({
4228            let client = client.clone();
4229            async move {
4230                sleep(Duration::from_millis(100)).await;
4231
4232                server
4233                    .mock_sync()
4234                    .ok_and_run(&client, |builder| {
4235                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4236                    })
4237                    .await;
4238            }
4239        });
4240
4241        let room =
4242            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4243        assert_eq!(room.room_id(), room_id);
4244    }
4245
4246    #[async_test]
4247    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4248        let client = MockClientBuilder::new(None).build().await;
4249
4250        let room_id = room_id!("!room:example.org");
4251        // Room is not present so the client won't be able to find it. The call will
4252        // timeout.
4253        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4254    }
4255
4256    #[async_test]
4257    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4258        let server = MatrixMockServer::new().await;
4259        let client = server.client_builder().build().await;
4260
4261        server.mock_create_room().ok().mount().await;
4262
4263        // Create a room in the internal store
4264        let room = client
4265            .create_room(assign!(CreateRoomRequest::new(), {
4266                invite: vec![],
4267                is_direct: false,
4268            }))
4269            .await
4270            .unwrap();
4271
4272        // Room is locally present, but not synced, the call will timeout
4273        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4274            .await
4275            .unwrap_err();
4276    }
4277
4278    #[async_test]
4279    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4280        let server = MatrixMockServer::new().await;
4281        let client = server.client_builder().build().await;
4282
4283        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4284
4285        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4286        assert_matches!(ret, Ok(true));
4287    }
4288
4289    #[async_test]
4290    async fn test_is_room_alias_available_if_alias_is_resolved() {
4291        let server = MatrixMockServer::new().await;
4292        let client = server.client_builder().build().await;
4293
4294        server
4295            .mock_room_directory_resolve_alias()
4296            .ok("!some_room_id:matrix.org", Vec::new())
4297            .expect(1)
4298            .mount()
4299            .await;
4300
4301        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4302        assert_matches!(ret, Ok(false));
4303    }
4304
4305    #[async_test]
4306    async fn test_is_room_alias_available_if_error_found() {
4307        let server = MatrixMockServer::new().await;
4308        let client = server.client_builder().build().await;
4309
4310        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4311
4312        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4313        assert_matches!(ret, Err(_));
4314    }
4315
4316    #[async_test]
4317    async fn test_create_room_alias() {
4318        let server = MatrixMockServer::new().await;
4319        let client = server.client_builder().build().await;
4320
4321        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4322
4323        let ret = client
4324            .create_room_alias(
4325                room_alias_id!("#some_alias:matrix.org"),
4326                room_id!("!some_room:matrix.org"),
4327            )
4328            .await;
4329        assert_matches!(ret, Ok(()));
4330    }
4331
4332    #[async_test]
4333    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4334        let server = MatrixMockServer::new().await;
4335        let client = server.client_builder().build().await;
4336
4337        let room_id = room_id!("!a-room:matrix.org");
4338
4339        // Make sure the summary endpoint is called once
4340        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4341
4342        // We create a locally cached invited room
4343        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4344
4345        // And we get a preview, the server endpoint was reached
4346        let preview = client
4347            .get_room_preview(room_id.into(), Vec::new())
4348            .await
4349            .expect("Room preview should be retrieved");
4350
4351        assert_eq!(invited_room.room_id(), preview.room_id);
4352    }
4353
4354    #[async_test]
4355    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4356        let server = MatrixMockServer::new().await;
4357        let client = server.client_builder().build().await;
4358
4359        let room_id = room_id!("!a-room:matrix.org");
4360
4361        // Make sure the summary endpoint is called once
4362        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4363
4364        // We create a locally cached left room
4365        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4366
4367        // And we get a preview, the server endpoint was reached
4368        let preview = client
4369            .get_room_preview(room_id.into(), Vec::new())
4370            .await
4371            .expect("Room preview should be retrieved");
4372
4373        assert_eq!(left_room.room_id(), preview.room_id);
4374    }
4375
4376    #[async_test]
4377    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4378        let server = MatrixMockServer::new().await;
4379        let client = server.client_builder().build().await;
4380
4381        let room_id = room_id!("!a-room:matrix.org");
4382
4383        // Make sure the summary endpoint is called once
4384        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4385
4386        // We create a locally cached knocked room
4387        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4388
4389        // And we get a preview, the server endpoint was reached
4390        let preview = client
4391            .get_room_preview(room_id.into(), Vec::new())
4392            .await
4393            .expect("Room preview should be retrieved");
4394
4395        assert_eq!(knocked_room.room_id(), preview.room_id);
4396    }
4397
4398    #[async_test]
4399    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4400        let server = MatrixMockServer::new().await;
4401        let client = server.client_builder().build().await;
4402
4403        let room_id = room_id!("!a-room:matrix.org");
4404
4405        // Make sure the summary endpoint is not called
4406        server.mock_room_summary().ok(room_id).never().mount().await;
4407
4408        // We create a locally cached joined room
4409        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4410
4411        // And we get a preview, no server endpoint was reached
4412        let preview = client
4413            .get_room_preview(room_id.into(), Vec::new())
4414            .await
4415            .expect("Room preview should be retrieved");
4416
4417        assert_eq!(joined_room.room_id(), preview.room_id);
4418    }
4419
4420    #[async_test]
4421    async fn test_media_preview_config() {
4422        let server = MatrixMockServer::new().await;
4423        let client = server.client_builder().build().await;
4424
4425        server
4426            .mock_sync()
4427            .ok_and_run(&client, |builder| {
4428                builder.add_custom_global_account_data(json!({
4429                    "content": {
4430                        "media_previews": "private",
4431                        "invite_avatars": "off"
4432                    },
4433                    "type": "m.media_preview_config"
4434                }));
4435            })
4436            .await;
4437
4438        let (initial_value, stream) =
4439            client.account().observe_media_preview_config().await.unwrap();
4440
4441        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4442        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4443        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4444        pin_mut!(stream);
4445        assert_pending!(stream);
4446
4447        server
4448            .mock_sync()
4449            .ok_and_run(&client, |builder| {
4450                builder.add_custom_global_account_data(json!({
4451                    "content": {
4452                        "media_previews": "off",
4453                        "invite_avatars": "on"
4454                    },
4455                    "type": "m.media_preview_config"
4456                }));
4457            })
4458            .await;
4459
4460        assert_next_matches!(
4461            stream,
4462            MediaPreviewConfigEventContent {
4463                media_previews: Some(MediaPreviews::Off),
4464                invite_avatars: Some(InviteAvatars::On),
4465                ..
4466            }
4467        );
4468        assert_pending!(stream);
4469    }
4470
4471    #[async_test]
4472    async fn test_unstable_media_preview_config() {
4473        let server = MatrixMockServer::new().await;
4474        let client = server.client_builder().build().await;
4475
4476        server
4477            .mock_sync()
4478            .ok_and_run(&client, |builder| {
4479                builder.add_custom_global_account_data(json!({
4480                    "content": {
4481                        "media_previews": "private",
4482                        "invite_avatars": "off"
4483                    },
4484                    "type": "io.element.msc4278.media_preview_config"
4485                }));
4486            })
4487            .await;
4488
4489        let (initial_value, stream) =
4490            client.account().observe_media_preview_config().await.unwrap();
4491
4492        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4493        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4494        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4495        pin_mut!(stream);
4496        assert_pending!(stream);
4497
4498        server
4499            .mock_sync()
4500            .ok_and_run(&client, |builder| {
4501                builder.add_custom_global_account_data(json!({
4502                    "content": {
4503                        "media_previews": "off",
4504                        "invite_avatars": "on"
4505                    },
4506                    "type": "io.element.msc4278.media_preview_config"
4507                }));
4508            })
4509            .await;
4510
4511        assert_next_matches!(
4512            stream,
4513            MediaPreviewConfigEventContent {
4514                media_previews: Some(MediaPreviews::Off),
4515                invite_avatars: Some(InviteAvatars::On),
4516                ..
4517            }
4518        );
4519        assert_pending!(stream);
4520    }
4521
4522    #[async_test]
4523    async fn test_media_preview_config_not_found() {
4524        let server = MatrixMockServer::new().await;
4525        let client = server.client_builder().build().await;
4526
4527        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4528
4529        assert!(initial_value.is_none());
4530    }
4531
4532    #[async_test]
4533    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4534        // The default Matrix version we use is 1.11 or higher, so authenticated media
4535        // is supported.
4536        let server = MatrixMockServer::new().await;
4537        let client = server.client_builder().build().await;
4538
4539        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4540
4541        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4542        client.load_or_fetch_max_upload_size().await.unwrap();
4543
4544        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4545    }
4546
4547    #[async_test]
4548    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4549        // The server must advertise support for the stable feature for authenticated
4550        // media support, so we mock the `GET /versions` response.
4551        let server = MatrixMockServer::new().await;
4552        let client = server.client_builder().no_server_versions().build().await;
4553
4554        server
4555            .mock_versions()
4556            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4557            .with_feature("org.matrix.msc3916.stable", true)
4558            .ok()
4559            .named("versions")
4560            .expect(1)
4561            .mount()
4562            .await;
4563
4564        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4565
4566        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4567        client.load_or_fetch_max_upload_size().await.unwrap();
4568
4569        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4570    }
4571
4572    #[async_test]
4573    async fn test_load_or_fetch_max_upload_size_no_auth() {
4574        // The server must not support Matrix 1.11 or higher for unauthenticated
4575        // media requests, so we mock the `GET /versions` response.
4576        let server = MatrixMockServer::new().await;
4577        let client = server.client_builder().no_server_versions().build().await;
4578
4579        server
4580            .mock_versions()
4581            .with_versions(vec!["v1.1"])
4582            .ok()
4583            .named("versions")
4584            .expect(1)
4585            .mount()
4586            .await;
4587
4588        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4589
4590        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4591        client.load_or_fetch_max_upload_size().await.unwrap();
4592
4593        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4594    }
4595
4596    #[async_test]
4597    async fn test_uploading_a_too_large_media_file() {
4598        let server = MatrixMockServer::new().await;
4599        let client = server.client_builder().build().await;
4600
4601        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4602        client.load_or_fetch_max_upload_size().await.unwrap();
4603        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4604
4605        let data = vec![1, 2];
4606        let upload_request =
4607            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4608        let request = SendRequest {
4609            client: client.clone(),
4610            request: upload_request,
4611            config: None,
4612            send_progress: SharedObservable::new(TransmissionProgress::default()),
4613        };
4614        let media_request = SendMediaUploadRequest::new(request);
4615
4616        let error = media_request.await.err();
4617        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4618        assert_eq!(max, uint!(1));
4619        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4620    }
4621
4622    #[async_test]
4623    async fn test_dont_ignore_timeout_on_first_sync() {
4624        let server = MatrixMockServer::new().await;
4625        let client = server.client_builder().build().await;
4626
4627        server
4628            .mock_sync()
4629            .timeout(Some(Duration::from_secs(30)))
4630            .ok(|_| {})
4631            .mock_once()
4632            .named("sync_with_timeout")
4633            .mount()
4634            .await;
4635
4636        // Call the endpoint once to check the timeout.
4637        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4638
4639        timeout(Duration::from_secs(1), async {
4640            stream.next().await.unwrap().unwrap();
4641        })
4642        .await
4643        .unwrap();
4644    }
4645
4646    #[async_test]
4647    async fn test_ignore_timeout_on_first_sync() {
4648        let server = MatrixMockServer::new().await;
4649        let client = server.client_builder().build().await;
4650
4651        server
4652            .mock_sync()
4653            .timeout(None)
4654            .ok(|_| {})
4655            .mock_once()
4656            .named("sync_no_timeout")
4657            .mount()
4658            .await;
4659        server
4660            .mock_sync()
4661            .timeout(Some(Duration::from_secs(30)))
4662            .ok(|_| {})
4663            .mock_once()
4664            .named("sync_with_timeout")
4665            .mount()
4666            .await;
4667
4668        // Call each version of the endpoint once to check the timeouts.
4669        let mut stream = Box::pin(
4670            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4671        );
4672
4673        timeout(Duration::from_secs(1), async {
4674            stream.next().await.unwrap().unwrap();
4675            stream.next().await.unwrap().unwrap();
4676        })
4677        .await
4678        .unwrap();
4679    }
4680
4681    #[async_test]
4682    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4683        let server = MatrixMockServer::new().await;
4684        let client = server.client_builder().build().await;
4685        // This is the user ID that is inside MemberAdditional.
4686        // Note the confusing username, so we can share
4687        // GlobalAccountDataTestEvent::Direct with the invited test.
4688        let user_id = user_id!("@invited:localhost");
4689
4690        // When we receive a sync response saying "invited" is invited to a DM
4691        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4692        let response = SyncResponseBuilder::default()
4693            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4694            .add_global_account_data(
4695                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4696            )
4697            .build_sync_response();
4698        client.base_client().receive_sync_response(response).await.unwrap();
4699
4700        // Then get_dm_room finds this room
4701        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4702        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4703    }
4704
4705    #[async_test]
4706    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4707        let server = MatrixMockServer::new().await;
4708        let client = server.client_builder().build().await;
4709        // This is the user ID that is inside MemberInvite
4710        let user_id = user_id!("@invited:localhost");
4711
4712        // When we receive a sync response saying "invited" is invited to a DM
4713        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4714        let response = SyncResponseBuilder::default()
4715            .add_joined_room(
4716                JoinedRoomBuilder::default()
4717                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4718            )
4719            .add_global_account_data(
4720                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4721            )
4722            .build_sync_response();
4723        client.base_client().receive_sync_response(response).await.unwrap();
4724
4725        // Then get_dm_room finds this room
4726        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4727        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4728    }
4729
4730    #[async_test]
4731    async fn test_get_dm_room_still_finds_left_room() {
4732        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4733        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4734
4735        let server = MatrixMockServer::new().await;
4736        let client = server.client_builder().build().await;
4737        // This is the user ID that is inside MemberAdditional.
4738        // Note the confusing username, so we can share
4739        // GlobalAccountDataTestEvent::Direct with the invited test.
4740        let user_id = user_id!("@invited:localhost");
4741
4742        // When we receive a sync response saying "invited" has left a DM
4743        let f = EventFactory::new().sender(user_id);
4744        let response = SyncResponseBuilder::default()
4745            .add_joined_room(
4746                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4747            )
4748            .add_global_account_data(
4749                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4750            )
4751            .build_sync_response();
4752        client.base_client().receive_sync_response(response).await.unwrap();
4753
4754        // Then get_dm_room finds this room
4755        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4756        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4757    }
4758
4759    #[async_test]
4760    async fn test_device_exists() {
4761        let server = MatrixMockServer::new().await;
4762        let client = server.client_builder().build().await;
4763
4764        server.mock_get_device().ok().expect(1).mount().await;
4765
4766        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4767    }
4768
4769    #[async_test]
4770    async fn test_device_exists_404() {
4771        let server = MatrixMockServer::new().await;
4772        let client = server.client_builder().build().await;
4773
4774        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4775    }
4776
4777    #[async_test]
4778    async fn test_device_exists_500() {
4779        let server = MatrixMockServer::new().await;
4780        let client = server.client_builder().build().await;
4781
4782        server.mock_get_device().error500().expect(1).mount().await;
4783
4784        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4785    }
4786
4787    #[async_test]
4788    async fn test_fetching_well_known_with_homeserver_url() {
4789        let server = MatrixMockServer::new().await;
4790        let client = server.client_builder().build().await;
4791        server.mock_well_known().ok().mount().await;
4792
4793        assert_matches!(client.fetch_client_well_known().await, Some(_));
4794    }
4795
4796    #[async_test]
4797    async fn test_fetching_well_known_with_server_name() {
4798        let server = MatrixMockServer::new().await;
4799        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4800
4801        server.mock_well_known().ok().mount().await;
4802
4803        let client = MockClientBuilder::new(None)
4804            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4805            .build()
4806            .await;
4807
4808        assert_matches!(client.fetch_client_well_known().await, Some(_));
4809    }
4810
4811    #[async_test]
4812    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4813        let server = MatrixMockServer::new().await;
4814        server.mock_well_known().ok().mount().await;
4815
4816        let user_id =
4817            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4818        let client = MockClientBuilder::new(None)
4819            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4820            .build()
4821            .await;
4822
4823        assert_matches!(client.fetch_client_well_known().await, Some(_));
4824    }
4825}