matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36 SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37 SyncOutsideWasm, ThreadingSupport,
38 event_cache::store::EventCacheStoreLock,
39 media::store::MediaStoreLock,
40 store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{discover_homeserver, get_supported_versions},
59 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
60 knock::knock_room,
61 media,
62 membership::{join_room_by_id, join_room_by_id_or_alias},
63 room::create_room,
64 rtc::RtcTransport,
65 session::login::v3::DiscoveryInfo,
66 sync::sync_events,
67 threads::get_thread_subscriptions_changes,
68 uiaa,
69 user_directory::search_users,
70 },
71 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
72 path_builder::PathBuilder,
73 },
74 assign,
75 events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
76 push::Ruleset,
77 time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
82use url::Url;
83
84use self::{
85 caches::{Cache, CachedValue, ClientCaches},
86 futures::SendRequest,
87};
88use crate::{
89 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
90 Room, SessionTokens, TransmissionProgress,
91 authentication::{
92 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
93 oauth::OAuth,
94 },
95 client::{
96 homeserver_capabilities::HomeserverCapabilities,
97 thread_subscriptions::ThreadSubscriptionCatchup,
98 },
99 config::{RequestConfig, SyncToken},
100 deduplicating_handler::DeduplicatingHandler,
101 error::HttpResult,
102 event_cache::EventCache,
103 event_handler::{
104 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
105 EventHandlerStore, ObservableEventHandler, SyncEvent,
106 },
107 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
108 latest_events::LatestEvents,
109 live_locations_observer::BeaconInfoUpdate,
110 media::MediaError,
111 notification_settings::NotificationSettings,
112 room::RoomMember,
113 room_preview::RoomPreview,
114 send_queue::{SendQueue, SendQueueData},
115 sliding_sync::Version as SlidingSyncVersion,
116 sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120 cross_process_lock::CrossProcessLock,
121 encryption::{
122 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
123 VerificationState,
124 },
125};
126
127mod builder;
128pub(crate) mod caches;
129pub(crate) mod futures;
130pub(crate) mod homeserver_capabilities;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156 /// Continue running the loop.
157 Continue,
158 /// Break out of the loop.
159 Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165 /// The session's token is no longer valid.
166 UnknownToken(UnknownTokenErrorData),
167 /// The session's tokens have been refreshed.
168 TokensRefreshed,
169}
170
171/// Information about the server vendor obtained from the federation API.
172#[derive(Debug, Clone, PartialEq, Eq)]
173#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
174pub struct ServerVendorInfo {
175 /// The server name.
176 pub server_name: String,
177 /// The server version.
178 pub version: String,
179}
180
181/// Information about a map tile server advertised by the homeserver through the
182/// `tile_server` field of the matrix client well-known (MSC3488).
183#[derive(Debug, Clone, PartialEq, Eq, Hash)]
184#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
185pub struct TileServerInfo {
186 /// The URL of a map tile server's `style.json` file. See the
187 /// [Mapbox Style Specification](https://docs.mapbox.com/mapbox-gl-js/style-spec/)
188 /// for more details.
189 pub map_style_url: String,
190}
191
192impl From<discover_homeserver::TileServerInfo> for TileServerInfo {
193 fn from(value: discover_homeserver::TileServerInfo) -> Self {
194 Self { map_style_url: value.map_style_url }
195 }
196}
197
198/// An async/await enabled Matrix client.
199///
200/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
201#[derive(Clone)]
202pub struct Client {
203 pub(crate) inner: Arc<ClientInner>,
204}
205
206#[derive(Default)]
207pub(crate) struct ClientLocks {
208 /// Lock ensuring that only a single room may be marked as a DM at once.
209 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
210 /// explanation.
211 pub(crate) mark_as_dm_lock: Mutex<()>,
212
213 /// Lock ensuring that only a single secret store is getting opened at the
214 /// same time.
215 ///
216 /// This is important so we don't accidentally create multiple different new
217 /// default secret storage keys.
218 #[cfg(feature = "e2e-encryption")]
219 pub(crate) open_secret_store_lock: Mutex<()>,
220
221 /// Lock ensuring that we're only storing a single secret at a time.
222 ///
223 /// Take a look at the [`SecretStore::put_secret`] method for a more
224 /// detailed explanation.
225 ///
226 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
227 #[cfg(feature = "e2e-encryption")]
228 pub(crate) store_secret_lock: Mutex<()>,
229
230 /// Lock ensuring that only one method at a time might modify our backup.
231 #[cfg(feature = "e2e-encryption")]
232 pub(crate) backup_modify_lock: Mutex<()>,
233
234 /// Lock ensuring that we're going to attempt to upload backups for a single
235 /// requester.
236 #[cfg(feature = "e2e-encryption")]
237 pub(crate) backup_upload_lock: Mutex<()>,
238
239 /// Handler making sure we only have one group session sharing request in
240 /// flight per room.
241 #[cfg(feature = "e2e-encryption")]
242 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
243
244 /// Lock making sure we're only doing one key claim request at a time.
245 #[cfg(feature = "e2e-encryption")]
246 pub(crate) key_claim_lock: Mutex<()>,
247
248 /// Handler to ensure that only one members request is running at a time,
249 /// given a room.
250 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
251
252 /// Handler to ensure that only one encryption state request is running at a
253 /// time, given a room.
254 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
255
256 /// Deduplicating handler for sending read receipts. The string is an
257 /// internal implementation detail, see [`Self::send_single_receipt`].
258 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
259
260 #[cfg(feature = "e2e-encryption")]
261 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
262
263 /// Latest "generation" of data known by the crypto store.
264 ///
265 /// This is a counter that only increments, set in the database (and can
266 /// wrap). It's incremented whenever some process acquires a lock for the
267 /// first time. *This assumes the crypto store lock is being held, to
268 /// avoid data races on writing to this value in the store*.
269 ///
270 /// The current process will maintain this value in local memory and in the
271 /// DB over time. Observing a different value than the one read in
272 /// memory, when reading from the store indicates that somebody else has
273 /// written into the database under our feet.
274 ///
275 /// TODO: this should live in the `OlmMachine`, since it's information
276 /// related to the lock. As of today (2023-07-28), we blow up the entire
277 /// olm machine when there's a generation mismatch. So storing the
278 /// generation in the olm machine would make the client think there's
279 /// *always* a mismatch, and that's why we need to store the generation
280 /// outside the `OlmMachine`.
281 #[cfg(feature = "e2e-encryption")]
282 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
283}
284
285pub(crate) struct ClientInner {
286 /// All the data related to authentication and authorization.
287 pub(crate) auth_ctx: Arc<AuthCtx>,
288
289 /// The URL of the server.
290 ///
291 /// Not to be confused with the `Self::homeserver`. `server` is usually
292 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
293 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
294 /// homeserver (at the time of writing — 2024-08-28).
295 ///
296 /// This value is optional depending on how the `Client` has been built.
297 /// If it's been built from a homeserver URL directly, we don't know the
298 /// server. However, if the `Client` has been built from a server URL or
299 /// name, then the homeserver has been discovered, and we know both.
300 server: Option<Url>,
301
302 /// The URL of the homeserver to connect to.
303 ///
304 /// This is the URL for the client-server Matrix API.
305 homeserver: StdRwLock<Url>,
306
307 /// The sliding sync version.
308 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
309
310 /// The underlying HTTP client.
311 pub(crate) http_client: HttpClient,
312
313 /// User session data.
314 pub(super) base_client: BaseClient,
315
316 /// Collection of in-memory caches for the [`Client`].
317 pub(crate) caches: ClientCaches,
318
319 /// Collection of locks individual client methods might want to use, either
320 /// to ensure that only a single call to a method happens at once or to
321 /// deduplicate multiple calls to a method.
322 pub(crate) locks: ClientLocks,
323
324 /// The cross-process lock configuration.
325 ///
326 /// The SDK provides cross-process store locks (see
327 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
328 /// [`CrossProcessLockConfig::MultiProcess`] is used.
329 ///
330 /// If multiple `Client`s are running in different processes, this
331 /// value MUST be different for each `Client`.
332 cross_process_lock_config: CrossProcessLockConfig,
333
334 /// A mapping of the times at which the current user sent typing notices,
335 /// keyed by room.
336 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
337
338 /// Event handlers. See `add_event_handler`.
339 pub(crate) event_handlers: EventHandlerStore,
340
341 /// Notification handlers. See `register_notification_handler`.
342 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
343
344 /// The sender-side of channels used to receive room updates.
345 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
346
347 /// The sender-side of a channel used to observe all the room updates of a
348 /// sync response.
349 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
350
351 /// Whether the client should update its homeserver URL with the discovery
352 /// information present in the login response.
353 respect_login_well_known: bool,
354
355 /// An event that can be listened on to wait for a successful sync. The
356 /// event will only be fired if a sync loop is running. Can be used for
357 /// synchronization, e.g. if we send out a request to create a room, we can
358 /// wait for the sync to get the data to fetch a room object from the state
359 /// store.
360 pub(crate) sync_beat: event_listener::Event,
361
362 /// A central cache for events, inactive first.
363 ///
364 /// It becomes active when [`EventCache::subscribe`] is called.
365 pub(crate) event_cache: OnceCell<EventCache>,
366
367 /// End-to-end encryption related state.
368 #[cfg(feature = "e2e-encryption")]
369 pub(crate) e2ee: EncryptionData,
370
371 /// The verification state of our own device.
372 #[cfg(feature = "e2e-encryption")]
373 pub(crate) verification_state: SharedObservable<VerificationState>,
374
375 /// Whether to enable the experimental support for sending and receiving
376 /// encrypted room history on invite, per [MSC4268].
377 ///
378 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
379 #[cfg(feature = "e2e-encryption")]
380 pub(crate) enable_share_history_on_invite: bool,
381
382 /// Data related to the [`SendQueue`].
383 ///
384 /// [`SendQueue`]: crate::send_queue::SendQueue
385 pub(crate) send_queue_data: Arc<SendQueueData>,
386
387 /// The `max_upload_size` value of the homeserver, it contains the max
388 /// request size you can send.
389 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
390
391 /// The entry point to get the [`LatestEvent`] of rooms and threads.
392 ///
393 /// [`LatestEvent`]: crate::latest_event::LatestEvent
394 latest_events: OnceCell<LatestEvents>,
395
396 /// Service handling the catching up of thread subscriptions in the
397 /// background.
398 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
399
400 #[cfg(feature = "experimental-search")]
401 /// Handler for [`RoomIndex`]'s of each room
402 search_index: SearchIndex,
403
404 /// A monitor for background tasks spawned by the client.
405 pub(crate) task_monitor: TaskMonitor,
406
407 /// A sender to notify subscribers about duplicate key upload errors
408 /// triggered by requests to /keys/upload.
409 #[cfg(feature = "e2e-encryption")]
410 pub(crate) duplicate_key_upload_error_sender:
411 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
412}
413
414impl ClientInner {
415 /// Create a new `ClientInner`.
416 ///
417 /// All the fields passed as parameters here are those that must be cloned
418 /// upon instantiation of a sub-client, e.g. a client specialized for
419 /// notifications.
420 #[allow(clippy::too_many_arguments)]
421 async fn new(
422 auth_ctx: Arc<AuthCtx>,
423 server: Option<Url>,
424 homeserver: Url,
425 sliding_sync_version: SlidingSyncVersion,
426 http_client: HttpClient,
427 base_client: BaseClient,
428 supported_versions: CachedValue<TtlValue<SupportedVersions>>,
429 well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
430 respect_login_well_known: bool,
431 event_cache: OnceCell<EventCache>,
432 send_queue: Arc<SendQueueData>,
433 latest_events: OnceCell<LatestEvents>,
434 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
435 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
436 cross_process_lock_config: CrossProcessLockConfig,
437 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
438 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
439 ) -> Arc<Self> {
440 let caches = ClientCaches {
441 supported_versions: Cache::with_value(supported_versions),
442 well_known: Cache::with_value(well_known),
443 server_metadata: Cache::new(),
444 homeserver_capabilities: Cache::new(),
445 };
446
447 let client = Self {
448 server,
449 homeserver: StdRwLock::new(homeserver),
450 auth_ctx,
451 sliding_sync_version: StdRwLock::new(sliding_sync_version),
452 http_client,
453 base_client,
454 caches,
455 locks: Default::default(),
456 cross_process_lock_config,
457 typing_notice_times: Default::default(),
458 event_handlers: Default::default(),
459 notification_handlers: Default::default(),
460 room_update_channels: Default::default(),
461 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
462 // ballast for all observers to catch up.
463 room_updates_sender: broadcast::Sender::new(32),
464 respect_login_well_known,
465 sync_beat: event_listener::Event::new(),
466 event_cache,
467 send_queue_data: send_queue,
468 latest_events,
469 #[cfg(feature = "e2e-encryption")]
470 e2ee: EncryptionData::new(encryption_settings),
471 #[cfg(feature = "e2e-encryption")]
472 verification_state: SharedObservable::new(VerificationState::Unknown),
473 #[cfg(feature = "e2e-encryption")]
474 enable_share_history_on_invite,
475 server_max_upload_size: Mutex::new(OnceCell::new()),
476 #[cfg(feature = "experimental-search")]
477 search_index: search_index_handler,
478 thread_subscription_catchup,
479 task_monitor: TaskMonitor::new(),
480 #[cfg(feature = "e2e-encryption")]
481 duplicate_key_upload_error_sender: broadcast::channel(1).0,
482 };
483
484 #[allow(clippy::let_and_return)]
485 let client = Arc::new(client);
486
487 #[cfg(feature = "e2e-encryption")]
488 client.e2ee.initialize_tasks(&client);
489
490 let init_event_cache = client.event_cache.get_or_init(|| async {
491 EventCache::new(&client, client.base_client.event_cache_store().clone())
492 });
493
494 let init_thread_subscription_catchup = client
495 .thread_subscription_catchup
496 .get_or_init(|| ThreadSubscriptionCatchup::new(Client { inner: client.clone() }));
497
498 let _ = join!(init_event_cache, init_thread_subscription_catchup);
499
500 client
501 }
502}
503
504#[cfg(not(tarpaulin_include))]
505impl Debug for Client {
506 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
507 write!(fmt, "Client")
508 }
509}
510
511impl Client {
512 /// Create a new [`Client`] that will use the given homeserver.
513 ///
514 /// # Arguments
515 ///
516 /// * `homeserver_url` - The homeserver that the client should connect to.
517 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
518 Self::builder().homeserver_url(homeserver_url).build().await
519 }
520
521 /// Returns a subscriber that publishes an event every time the ignore user
522 /// list changes.
523 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
524 self.inner.base_client.subscribe_to_ignore_user_list_changes()
525 }
526
527 /// Create a new [`ClientBuilder`].
528 pub fn builder() -> ClientBuilder {
529 ClientBuilder::new()
530 }
531
532 pub(crate) fn base_client(&self) -> &BaseClient {
533 &self.inner.base_client
534 }
535
536 /// The underlying HTTP client.
537 pub fn http_client(&self) -> &reqwest::Client {
538 &self.inner.http_client.inner
539 }
540
541 pub(crate) fn locks(&self) -> &ClientLocks {
542 &self.inner.locks
543 }
544
545 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
546 &self.inner.auth_ctx
547 }
548
549 /// The cross-process store lock configuration used by this [`Client`].
550 ///
551 /// The SDK provides cross-process store locks (see
552 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
553 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
554 /// the value used for all cross-process store locks used by this
555 /// `Client`.
556 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
557 &self.inner.cross_process_lock_config
558 }
559
560 /// Change the homeserver URL used by this client.
561 ///
562 /// # Arguments
563 ///
564 /// * `homeserver_url` - The new URL to use.
565 fn set_homeserver(&self, homeserver_url: Url) {
566 *self.inner.homeserver.write().unwrap() = homeserver_url;
567 }
568
569 /// Change to a different homeserver and re-resolve well-known.
570 #[cfg(feature = "e2e-encryption")]
571 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
572 &self,
573 homeserver_url: Url,
574 ) -> Result<()> {
575 self.set_homeserver(homeserver_url);
576 self.reset_well_known().await?;
577 if let Some(well_known) = self.well_known().await {
578 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
579 }
580 Ok(())
581 }
582
583 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
584 /// supported or disabled by the homeserver.
585 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
586 HomeserverCapabilities::new(self.clone())
587 }
588
589 /// Get the server vendor information from the federation API.
590 ///
591 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
592 /// both the server's software name and version.
593 ///
594 /// # Examples
595 ///
596 /// ```no_run
597 /// # use matrix_sdk::Client;
598 /// # use url::Url;
599 /// # async {
600 /// # let homeserver = Url::parse("http://example.com")?;
601 /// let client = Client::new(homeserver).await?;
602 ///
603 /// let server_info = client.server_vendor_info(None).await?;
604 /// println!(
605 /// "Server: {}, Version: {}",
606 /// server_info.server_name, server_info.version
607 /// );
608 /// # anyhow::Ok(()) };
609 /// ```
610 #[cfg(feature = "federation-api")]
611 pub async fn server_vendor_info(
612 &self,
613 request_config: Option<RequestConfig>,
614 ) -> HttpResult<ServerVendorInfo> {
615 use ruma::api::federation::discovery::get_server_version;
616
617 let res = self
618 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
619 .await?;
620
621 // Extract server info, using defaults if fields are missing.
622 let server = res.server.unwrap_or_default();
623 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
624 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
625
626 Ok(ServerVendorInfo { server_name: server_name_str, version })
627 }
628
629 /// Get a copy of the default request config.
630 ///
631 /// The default request config is what's used when sending requests if no
632 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
633 /// function with such a parameter.
634 ///
635 /// If the default request config was not customized through
636 /// [`ClientBuilder`] when creating this `Client`, the returned value will
637 /// be equivalent to [`RequestConfig::default()`].
638 pub fn request_config(&self) -> RequestConfig {
639 self.inner.http_client.request_config
640 }
641
642 /// Check whether the client has been activated.
643 ///
644 /// A client is considered active when:
645 ///
646 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
647 /// is logged in,
648 /// 2. Has loaded cached data from storage,
649 /// 3. If encryption is enabled, it also initialized or restored its
650 /// `OlmMachine`.
651 pub fn is_active(&self) -> bool {
652 self.inner.base_client.is_active()
653 }
654
655 /// The server used by the client.
656 ///
657 /// See `Self::server` to learn more.
658 pub fn server(&self) -> Option<&Url> {
659 self.inner.server.as_ref()
660 }
661
662 /// The homeserver of the client.
663 pub fn homeserver(&self) -> Url {
664 self.inner.homeserver.read().unwrap().clone()
665 }
666
667 /// Get the sliding sync version.
668 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
669 self.inner.sliding_sync_version.read().unwrap().clone()
670 }
671
672 /// Override the sliding sync version.
673 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
674 let mut lock = self.inner.sliding_sync_version.write().unwrap();
675 *lock = version;
676 }
677
678 /// Get the Matrix user session meta information.
679 ///
680 /// If the client is currently logged in, this will return a
681 /// [`SessionMeta`] object which contains the user ID and device ID.
682 /// Otherwise it returns `None`.
683 pub fn session_meta(&self) -> Option<&SessionMeta> {
684 self.base_client().session_meta()
685 }
686
687 /// Returns a receiver that gets events for each room info update. To watch
688 /// for new events, use `receiver.resubscribe()`.
689 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
690 self.base_client().room_info_notable_update_receiver()
691 }
692
693 /// Performs a search for users.
694 /// The search is performed case-insensitively on user IDs and display names
695 ///
696 /// # Arguments
697 ///
698 /// * `search_term` - The search term for the search
699 /// * `limit` - The maximum number of results to return. Defaults to 10.
700 ///
701 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
702 pub async fn search_users(
703 &self,
704 search_term: &str,
705 limit: u64,
706 ) -> HttpResult<search_users::v3::Response> {
707 let mut request = search_users::v3::Request::new(search_term.to_owned());
708
709 if let Some(limit) = UInt::new(limit) {
710 request.limit = limit;
711 }
712
713 self.send(request).await
714 }
715
716 /// Get the user id of the current owner of the client.
717 pub fn user_id(&self) -> Option<&UserId> {
718 self.session_meta().map(|s| s.user_id.as_ref())
719 }
720
721 /// Get the device ID that identifies the current session.
722 pub fn device_id(&self) -> Option<&DeviceId> {
723 self.session_meta().map(|s| s.device_id.as_ref())
724 }
725
726 /// Get the current access token for this session.
727 ///
728 /// Will be `None` if the client has not been logged in.
729 pub fn access_token(&self) -> Option<String> {
730 self.auth_ctx().access_token()
731 }
732
733 /// Get the current tokens for this session.
734 ///
735 /// To be notified of changes in the session tokens, use
736 /// [`Client::subscribe_to_session_changes()`] or
737 /// [`Client::set_session_callbacks()`].
738 ///
739 /// Returns `None` if the client has not been logged in.
740 pub fn session_tokens(&self) -> Option<SessionTokens> {
741 self.auth_ctx().session_tokens()
742 }
743
744 /// Access the authentication API used to log in this client.
745 ///
746 /// Will be `None` if the client has not been logged in.
747 pub fn auth_api(&self) -> Option<AuthApi> {
748 match self.auth_ctx().auth_data.get()? {
749 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
750 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
751 }
752 }
753
754 /// Get the whole session info of this client.
755 ///
756 /// Will be `None` if the client has not been logged in.
757 ///
758 /// Can be used with [`Client::restore_session`] to restore a previously
759 /// logged-in session.
760 pub fn session(&self) -> Option<AuthSession> {
761 match self.auth_api()? {
762 AuthApi::Matrix(api) => api.session().map(Into::into),
763 AuthApi::OAuth(api) => api.full_session().map(Into::into),
764 }
765 }
766
767 /// Get a reference to the state store.
768 pub fn state_store(&self) -> &DynStateStore {
769 self.base_client().state_store()
770 }
771
772 /// Get a reference to the event cache store.
773 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
774 self.base_client().event_cache_store()
775 }
776
777 /// Get a reference to the media store.
778 pub fn media_store(&self) -> &MediaStoreLock {
779 self.base_client().media_store()
780 }
781
782 /// Access the native Matrix authentication API with this client.
783 pub fn matrix_auth(&self) -> MatrixAuth {
784 MatrixAuth::new(self.clone())
785 }
786
787 /// Get the account of the current owner of the client.
788 pub fn account(&self) -> Account {
789 Account::new(self.clone())
790 }
791
792 /// Get the encryption manager of the client.
793 #[cfg(feature = "e2e-encryption")]
794 pub fn encryption(&self) -> Encryption {
795 Encryption::new(self.clone())
796 }
797
798 /// Get the media manager of the client.
799 pub fn media(&self) -> Media {
800 Media::new(self.clone())
801 }
802
803 /// Get the pusher manager of the client.
804 pub fn pusher(&self) -> Pusher {
805 Pusher::new(self.clone())
806 }
807
808 /// Access the OAuth 2.0 API of the client.
809 pub fn oauth(&self) -> OAuth {
810 OAuth::new(self.clone())
811 }
812
813 /// Register a handler for a specific event type.
814 ///
815 /// The handler is a function or closure with one or more arguments. The
816 /// first argument is the event itself. All additional arguments are
817 /// "context" arguments: They have to implement [`EventHandlerContext`].
818 /// This trait is named that way because most of the types implementing it
819 /// give additional context about an event: The room it was in, its raw form
820 /// and other similar things. As two exceptions to this,
821 /// [`Client`] and [`EventHandlerHandle`] also implement the
822 /// `EventHandlerContext` trait so you don't have to clone your client
823 /// into the event handler manually and a handler can decide to remove
824 /// itself.
825 ///
826 /// Some context arguments are not universally applicable. A context
827 /// argument that isn't available for the given event type will result in
828 /// the event handler being skipped and an error being logged. The following
829 /// context argument types are only available for a subset of event types:
830 ///
831 /// * [`Room`] is only available for room-specific events, i.e. not for
832 /// events like global account data events or presence events.
833 ///
834 /// You can provide custom context via
835 /// [`add_event_handler_context`](Client::add_event_handler_context) and
836 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
837 /// into the event handler.
838 ///
839 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
840 ///
841 /// # Examples
842 ///
843 /// ```no_run
844 /// use matrix_sdk::{
845 /// deserialized_responses::EncryptionInfo,
846 /// event_handler::Ctx,
847 /// ruma::{
848 /// events::{
849 /// macros::EventContent,
850 /// push_rules::PushRulesEvent,
851 /// room::{
852 /// message::SyncRoomMessageEvent,
853 /// topic::SyncRoomTopicEvent,
854 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
855 /// },
856 /// },
857 /// push::Action,
858 /// Int, MilliSecondsSinceUnixEpoch,
859 /// },
860 /// Client, Room,
861 /// };
862 /// use serde::{Deserialize, Serialize};
863 ///
864 /// # async fn example(client: Client) {
865 /// client.add_event_handler(
866 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
867 /// // Common usage: Room event plus room and client.
868 /// },
869 /// );
870 /// client.add_event_handler(
871 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
872 /// async move {
873 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
874 /// // unencrypted events and events that were decrypted by the SDK.
875 /// }
876 /// },
877 /// );
878 /// client.add_event_handler(
879 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
880 /// async move {
881 /// // A `Vec<Action>` parameter allows you to know which push actions
882 /// // are applicable for an event. For example, an event with
883 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
884 /// // should be highlighted in the timeline.
885 /// }
886 /// },
887 /// );
888 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
889 /// // You can omit any or all arguments after the first.
890 /// });
891 ///
892 /// // Registering a temporary event handler:
893 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
894 /// /* Event handler */
895 /// });
896 /// client.remove_event_handler(handle);
897 ///
898 /// // Registering custom event handler context:
899 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
900 /// struct MyContext {
901 /// number: usize,
902 /// }
903 /// client.add_event_handler_context(MyContext { number: 5 });
904 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
905 /// // Use the context
906 /// });
907 ///
908 /// // This will handle membership events in joined rooms. Invites are special, see below.
909 /// client.add_event_handler(
910 /// |ev: SyncRoomMemberEvent| async move {},
911 /// );
912 ///
913 /// // To handle state events in invited rooms (including invite membership events),
914 /// // `StrippedRoomMemberEvent` should be used.
915 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
916 /// client.add_event_handler(
917 /// |ev: StrippedRoomMemberEvent| async move {},
918 /// );
919 ///
920 /// // Custom events work exactly the same way, you just need to declare
921 /// // the content struct and use the EventContent derive macro on it.
922 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
923 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
924 /// struct TokenEventContent {
925 /// token: String,
926 /// #[serde(rename = "exp")]
927 /// expires_at: MilliSecondsSinceUnixEpoch,
928 /// }
929 ///
930 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
931 /// todo!("Display the token");
932 /// });
933 ///
934 /// // Event handler closures can also capture local variables.
935 /// // Make sure they are cheap to clone though, because they will be cloned
936 /// // every time the closure is called.
937 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
938 ///
939 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
940 /// println!("Calling the handler with identifier {data}");
941 /// });
942 /// # }
943 /// ```
944 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
945 where
946 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
947 H: EventHandler<Ev, Ctx>,
948 {
949 self.add_event_handler_impl(handler, None)
950 }
951
952 /// Register a handler for a specific room, and event type.
953 ///
954 /// This method works the same way as
955 /// [`add_event_handler`][Self::add_event_handler], except that the handler
956 /// will only be called for events in the room with the specified ID. See
957 /// that method for more details on event handler functions.
958 ///
959 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
960 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
961 /// your use case.
962 pub fn add_room_event_handler<Ev, Ctx, H>(
963 &self,
964 room_id: &RoomId,
965 handler: H,
966 ) -> EventHandlerHandle
967 where
968 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
969 H: EventHandler<Ev, Ctx>,
970 {
971 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
972 }
973
974 /// Observe a specific event type.
975 ///
976 /// `Ev` represents the kind of event that will be observed. `Ctx`
977 /// represents the context that will come with the event. It relies on the
978 /// same mechanism as [`Client::add_event_handler`]. The main difference is
979 /// that it returns an [`ObservableEventHandler`] and doesn't require a
980 /// user-defined closure. It is possible to subscribe to the
981 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
982 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
983 /// Ctx)`.
984 ///
985 /// Be careful that only the most recent value can be observed. Subscribers
986 /// are notified when a new value is sent, but there is no guarantee
987 /// that they will see all values.
988 ///
989 /// # Example
990 ///
991 /// Let's see a classical usage:
992 ///
993 /// ```
994 /// use futures_util::StreamExt as _;
995 /// use matrix_sdk::{
996 /// Client, Room,
997 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
998 /// };
999 ///
1000 /// # async fn example(client: Client) -> Option<()> {
1001 /// let observer =
1002 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1003 ///
1004 /// let mut subscriber = observer.subscribe();
1005 ///
1006 /// let (event, (room, push_actions)) = subscriber.next().await?;
1007 /// # Some(())
1008 /// # }
1009 /// ```
1010 ///
1011 /// Now let's see how to get several contexts that can be useful for you:
1012 ///
1013 /// ```
1014 /// use matrix_sdk::{
1015 /// Client, Room,
1016 /// deserialized_responses::EncryptionInfo,
1017 /// ruma::{
1018 /// events::room::{
1019 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1020 /// },
1021 /// push::Action,
1022 /// },
1023 /// };
1024 ///
1025 /// # async fn example(client: Client) {
1026 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1027 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1028 ///
1029 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1030 /// // to distinguish between unencrypted events and events that were decrypted
1031 /// // by the SDK.
1032 /// let _ = client
1033 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1034 /// );
1035 ///
1036 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1037 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1038 /// // should be highlighted in the timeline.
1039 /// let _ =
1040 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1041 ///
1042 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1043 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1044 /// # }
1045 /// ```
1046 ///
1047 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1048 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1049 where
1050 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1051 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1052 {
1053 self.observe_room_events_impl(None)
1054 }
1055
1056 /// Observe a specific room, and event type.
1057 ///
1058 /// This method works the same way as [`Client::observe_events`], except
1059 /// that the observability will only be applied for events in the room with
1060 /// the specified ID. See that method for more details.
1061 ///
1062 /// Be careful that only the most recent value can be observed. Subscribers
1063 /// are notified when a new value is sent, but there is no guarantee
1064 /// that they will see all values.
1065 pub fn observe_room_events<Ev, Ctx>(
1066 &self,
1067 room_id: &RoomId,
1068 ) -> ObservableEventHandler<(Ev, Ctx)>
1069 where
1070 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1071 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1072 {
1073 self.observe_room_events_impl(Some(room_id.to_owned()))
1074 }
1075
1076 /// Shared implementation for `Client::observe_events` and
1077 /// `Client::observe_room_events`.
1078 fn observe_room_events_impl<Ev, Ctx>(
1079 &self,
1080 room_id: Option<OwnedRoomId>,
1081 ) -> ObservableEventHandler<(Ev, Ctx)>
1082 where
1083 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1084 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1085 {
1086 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1087 // new value.
1088 let shared_observable = SharedObservable::new(None);
1089
1090 ObservableEventHandler::new(
1091 shared_observable.clone(),
1092 self.event_handler_drop_guard(self.add_event_handler_impl(
1093 move |event: Ev, context: Ctx| {
1094 shared_observable.set(Some((event, context)));
1095
1096 ready(())
1097 },
1098 room_id,
1099 )),
1100 )
1101 }
1102
1103 /// Subscribe to future `beacon_info` updates for the current user across
1104 /// all rooms.
1105 ///
1106 /// This stream is push-only: it emits only future updates observed during
1107 /// sync processing and does not replay existing state.
1108 pub fn observe_own_beacon_info_updates(
1109 &self,
1110 ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1111 let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1112 let mut stream = observer.subscribe();
1113 let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1114 Ok(async_stream::stream! {
1115 let _observer = observer;
1116
1117 while let Some((event, room)) = stream.next().await {
1118 if event.state_key != own_user_id {
1119 continue;
1120 }
1121 yield BeaconInfoUpdate {
1122 room_id: room.room_id().to_owned(),
1123 event_id: event.event_id,
1124 content: event.content,
1125 };
1126 }
1127 })
1128 }
1129
1130 /// Remove the event handler associated with the handle.
1131 ///
1132 /// Note that you **must not** call `remove_event_handler` from the
1133 /// non-async part of an event handler, that is:
1134 ///
1135 /// ```ignore
1136 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1137 /// // âš this will cause a deadlock âš
1138 /// client.remove_event_handler(handle);
1139 ///
1140 /// async move {
1141 /// // removing the event handler here is fine
1142 /// client.remove_event_handler(handle);
1143 /// }
1144 /// })
1145 /// ```
1146 ///
1147 /// Note also that handlers that remove themselves will still execute with
1148 /// events received in the same sync cycle.
1149 ///
1150 /// # Arguments
1151 ///
1152 /// `handle` - The [`EventHandlerHandle`] that is returned when
1153 /// registering the event handler with [`Client::add_event_handler`].
1154 ///
1155 /// # Examples
1156 ///
1157 /// ```no_run
1158 /// # use url::Url;
1159 /// # use tokio::sync::mpsc;
1160 /// #
1161 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1162 /// #
1163 /// use matrix_sdk::{
1164 /// Client, event_handler::EventHandlerHandle,
1165 /// ruma::events::room::member::SyncRoomMemberEvent,
1166 /// };
1167 /// #
1168 /// # futures_executor::block_on(async {
1169 /// # let client = matrix_sdk::Client::builder()
1170 /// # .homeserver_url(homeserver)
1171 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1172 /// # .build()
1173 /// # .await
1174 /// # .unwrap();
1175 ///
1176 /// client.add_event_handler(
1177 /// |ev: SyncRoomMemberEvent,
1178 /// client: Client,
1179 /// handle: EventHandlerHandle| async move {
1180 /// // Common usage: Check arriving Event is the expected one
1181 /// println!("Expected RoomMemberEvent received!");
1182 /// client.remove_event_handler(handle);
1183 /// },
1184 /// );
1185 /// # });
1186 /// ```
1187 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1188 self.inner.event_handlers.remove(handle);
1189 }
1190
1191 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1192 /// the given handle.
1193 ///
1194 /// When the returned value is dropped, the event handler will be removed.
1195 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1196 EventHandlerDropGuard::new(handle, self.clone())
1197 }
1198
1199 /// Add an arbitrary value for use as event handler context.
1200 ///
1201 /// The value can be obtained in an event handler by adding an argument of
1202 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1203 ///
1204 /// If a value of the same type has been added before, it will be
1205 /// overwritten.
1206 ///
1207 /// # Examples
1208 ///
1209 /// ```no_run
1210 /// use matrix_sdk::{
1211 /// Room, event_handler::Ctx,
1212 /// ruma::events::room::message::SyncRoomMessageEvent,
1213 /// };
1214 /// # #[derive(Clone)]
1215 /// # struct SomeType;
1216 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1217 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1218 /// # futures_executor::block_on(async {
1219 /// # let client = matrix_sdk::Client::builder()
1220 /// # .homeserver_url(homeserver)
1221 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1222 /// # .build()
1223 /// # .await
1224 /// # .unwrap();
1225 ///
1226 /// // Handle used to send messages to the UI part of the app
1227 /// let my_gui_handle: SomeType = obtain_gui_handle();
1228 ///
1229 /// client.add_event_handler_context(my_gui_handle.clone());
1230 /// client.add_event_handler(
1231 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1232 /// async move {
1233 /// // gui_handle.send(DisplayMessage { message: ev });
1234 /// }
1235 /// },
1236 /// );
1237 /// # });
1238 /// ```
1239 pub fn add_event_handler_context<T>(&self, ctx: T)
1240 where
1241 T: Clone + Send + Sync + 'static,
1242 {
1243 self.inner.event_handlers.add_context(ctx);
1244 }
1245
1246 /// Register a handler for a notification.
1247 ///
1248 /// Similar to [`Client::add_event_handler`], but only allows functions
1249 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1250 /// [`Client`] for now.
1251 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1252 where
1253 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1254 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1255 {
1256 self.inner.notification_handlers.write().await.push(Box::new(
1257 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1258 ));
1259
1260 self
1261 }
1262
1263 /// Subscribe to all updates for the room with the given ID.
1264 ///
1265 /// The returned receiver will receive a new message for each sync response
1266 /// that contains updates for that room.
1267 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1268 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1269 btree_map::Entry::Vacant(entry) => {
1270 let (tx, rx) = broadcast::channel(8);
1271 entry.insert(tx);
1272 rx
1273 }
1274 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1275 }
1276 }
1277
1278 /// Subscribe to all updates to all rooms, whenever any has been received in
1279 /// a sync response.
1280 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1281 self.inner.room_updates_sender.subscribe()
1282 }
1283
1284 pub(crate) async fn notification_handlers(
1285 &self,
1286 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1287 self.inner.notification_handlers.read().await
1288 }
1289
1290 /// Get all the rooms the client knows about.
1291 ///
1292 /// This will return the list of joined, invited, and left rooms.
1293 pub fn rooms(&self) -> Vec<Room> {
1294 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1295 }
1296
1297 /// Get all the rooms the client knows about, filtered by room state.
1298 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1299 self.base_client()
1300 .rooms_filtered(filter)
1301 .into_iter()
1302 .map(|room| Room::new(self.clone(), room))
1303 .collect()
1304 }
1305
1306 /// Get a stream of all the rooms, in addition to the existing rooms.
1307 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1308 let (rooms, stream) = self.base_client().rooms_stream();
1309
1310 let map_room = |room| Room::new(self.clone(), room);
1311
1312 (
1313 rooms.into_iter().map(map_room).collect(),
1314 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1315 )
1316 }
1317
1318 /// Returns the joined rooms this client knows about.
1319 pub fn joined_rooms(&self) -> Vec<Room> {
1320 self.rooms_filtered(RoomStateFilter::JOINED)
1321 }
1322
1323 /// Returns the invited rooms this client knows about.
1324 pub fn invited_rooms(&self) -> Vec<Room> {
1325 self.rooms_filtered(RoomStateFilter::INVITED)
1326 }
1327
1328 /// Returns the left rooms this client knows about.
1329 pub fn left_rooms(&self) -> Vec<Room> {
1330 self.rooms_filtered(RoomStateFilter::LEFT)
1331 }
1332
1333 /// Returns the joined space rooms this client knows about.
1334 pub fn joined_space_rooms(&self) -> Vec<Room> {
1335 self.base_client()
1336 .rooms_filtered(RoomStateFilter::JOINED)
1337 .into_iter()
1338 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1339 .collect()
1340 }
1341
1342 /// Get a room with the given room id.
1343 ///
1344 /// # Arguments
1345 ///
1346 /// `room_id` - The unique id of the room that should be fetched.
1347 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1348 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1349 }
1350
1351 /// Gets the preview of a room, whether the current user has joined it or
1352 /// not.
1353 pub async fn get_room_preview(
1354 &self,
1355 room_or_alias_id: &RoomOrAliasId,
1356 via: Vec<OwnedServerName>,
1357 ) -> Result<RoomPreview> {
1358 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1359 Ok(room_id) => room_id.to_owned(),
1360 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1361 };
1362
1363 if let Some(room) = self.get_room(&room_id) {
1364 // The cached data can only be trusted if the room state is joined or
1365 // banned: for invite and knock rooms, no updates will be received
1366 // for the rooms after the invite/knock action took place so we may
1367 // have very out to date data for important fields such as
1368 // `join_rule`. For left rooms, the homeserver should return the latest info.
1369 match room.state() {
1370 RoomState::Joined | RoomState::Banned => {
1371 return Ok(RoomPreview::from_known_room(&room).await);
1372 }
1373 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1374 }
1375 }
1376
1377 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1378 }
1379
1380 /// Resolve a room alias to a room id and a list of servers which know
1381 /// about it.
1382 ///
1383 /// # Arguments
1384 ///
1385 /// `room_alias` - The room alias to be resolved.
1386 pub async fn resolve_room_alias(
1387 &self,
1388 room_alias: &RoomAliasId,
1389 ) -> HttpResult<get_alias::v3::Response> {
1390 let request = get_alias::v3::Request::new(room_alias.to_owned());
1391 self.send(request).await
1392 }
1393
1394 /// Checks if a room alias is not in use yet.
1395 ///
1396 /// Returns:
1397 /// - `Ok(true)` if the room alias is available.
1398 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1399 /// status code).
1400 /// - An `Err` otherwise.
1401 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1402 match self.resolve_room_alias(alias).await {
1403 // The room alias was resolved, so it's already in use.
1404 Ok(_) => Ok(false),
1405 Err(error) => {
1406 match error.client_api_error_kind() {
1407 // The room alias wasn't found, so it's available.
1408 Some(ErrorKind::NotFound) => Ok(true),
1409 _ => Err(error),
1410 }
1411 }
1412 }
1413 }
1414
1415 /// Adds a new room alias associated with a room to the room directory.
1416 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1417 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1418 self.send(request).await?;
1419 Ok(())
1420 }
1421
1422 /// Removes a room alias from the room directory.
1423 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1424 let request = delete_alias::v3::Request::new(alias.to_owned());
1425 self.send(request).await?;
1426 Ok(())
1427 }
1428
1429 /// Update the homeserver from the login response well-known if needed.
1430 ///
1431 /// # Arguments
1432 ///
1433 /// * `login_well_known` - The `well_known` field from a successful login
1434 /// response.
1435 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1436 if self.inner.respect_login_well_known
1437 && let Some(well_known) = login_well_known
1438 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1439 {
1440 self.set_homeserver(homeserver);
1441 }
1442 }
1443
1444 /// Similar to [`Client::restore_session_with`], with
1445 /// [`RoomLoadSettings::default()`].
1446 ///
1447 /// # Panics
1448 ///
1449 /// Panics if a session was already restored or logged in.
1450 #[instrument(skip_all)]
1451 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1452 self.restore_session_with(session, RoomLoadSettings::default()).await
1453 }
1454
1455 /// Restore a session previously logged-in using one of the available
1456 /// authentication APIs. The number of rooms to restore is controlled by
1457 /// [`RoomLoadSettings`].
1458 ///
1459 /// See the documentation of the corresponding authentication API's
1460 /// `restore_session` method for more information.
1461 ///
1462 /// # Panics
1463 ///
1464 /// Panics if a session was already restored or logged in.
1465 #[instrument(skip_all)]
1466 pub async fn restore_session_with(
1467 &self,
1468 session: impl Into<AuthSession>,
1469 room_load_settings: RoomLoadSettings,
1470 ) -> Result<()> {
1471 let session = session.into();
1472 match session {
1473 AuthSession::Matrix(session) => {
1474 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1475 }
1476 AuthSession::OAuth(session) => {
1477 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1478 }
1479 }
1480 }
1481
1482 /// Refresh the access token using the authentication API used to log into
1483 /// this session.
1484 ///
1485 /// See the documentation of the authentication API's `refresh_access_token`
1486 /// method for more information.
1487 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1488 let Some(auth_api) = self.auth_api() else {
1489 return Err(RefreshTokenError::RefreshTokenRequired);
1490 };
1491
1492 match auth_api {
1493 AuthApi::Matrix(api) => {
1494 trace!("Token refresh: Using the homeserver.");
1495 Box::pin(api.refresh_access_token()).await?;
1496 }
1497 AuthApi::OAuth(api) => {
1498 trace!("Token refresh: Using OAuth 2.0.");
1499 Box::pin(api.refresh_access_token()).await?;
1500 }
1501 }
1502
1503 Ok(())
1504 }
1505
1506 /// Log out the current session using the proper authentication API.
1507 ///
1508 /// # Errors
1509 ///
1510 /// Returns an error if the session is not authenticated or if an error
1511 /// occurred while making the request to the server.
1512 pub async fn logout(&self) -> Result<(), Error> {
1513 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1514 match auth_api {
1515 AuthApi::Matrix(matrix_auth) => {
1516 matrix_auth.logout().await?;
1517 Ok(())
1518 }
1519 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1520 }
1521 }
1522
1523 /// Get or upload a sync filter.
1524 ///
1525 /// This method will either get a filter ID from the store or upload the
1526 /// filter definition to the homeserver and return the new filter ID.
1527 ///
1528 /// # Arguments
1529 ///
1530 /// * `filter_name` - The unique name of the filter, this name will be used
1531 /// locally to store and identify the filter ID returned by the server.
1532 ///
1533 /// * `definition` - The filter definition that should be uploaded to the
1534 /// server if no filter ID can be found in the store.
1535 ///
1536 /// # Examples
1537 ///
1538 /// ```no_run
1539 /// # use matrix_sdk::{
1540 /// # Client, config::SyncSettings,
1541 /// # ruma::api::client::{
1542 /// # filter::{
1543 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1544 /// # },
1545 /// # sync::sync_events::v3::Filter,
1546 /// # }
1547 /// # };
1548 /// # use url::Url;
1549 /// # async {
1550 /// # let homeserver = Url::parse("http://example.com").unwrap();
1551 /// # let client = Client::new(homeserver).await.unwrap();
1552 /// let mut filter = FilterDefinition::default();
1553 ///
1554 /// // Let's enable member lazy loading.
1555 /// filter.room.state.lazy_load_options =
1556 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1557 ///
1558 /// let filter_id = client
1559 /// .get_or_upload_filter("sync", filter)
1560 /// .await
1561 /// .unwrap();
1562 ///
1563 /// let sync_settings = SyncSettings::new()
1564 /// .filter(Filter::FilterId(filter_id));
1565 ///
1566 /// let response = client.sync_once(sync_settings).await.unwrap();
1567 /// # };
1568 #[instrument(skip(self, definition))]
1569 pub async fn get_or_upload_filter(
1570 &self,
1571 filter_name: &str,
1572 definition: FilterDefinition,
1573 ) -> Result<String> {
1574 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1575 debug!("Found filter locally");
1576 Ok(filter)
1577 } else {
1578 debug!("Didn't find filter locally");
1579 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1580 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1581 let response = self.send(request).await?;
1582
1583 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1584
1585 Ok(response.filter_id)
1586 }
1587 }
1588
1589 /// Prepare to join a room by ID, by getting the current details about it
1590 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1591 let room = self.get_room(room_id)?;
1592
1593 let inviter = match room.invite_details().await {
1594 Ok(details) => details.inviter,
1595 Err(Error::WrongRoomState(_)) => None,
1596 Err(e) => {
1597 warn!("Error fetching invite details for room: {e:?}");
1598 None
1599 }
1600 };
1601
1602 Some(PreJoinRoomInfo { inviter })
1603 }
1604
1605 /// Finish joining a room.
1606 ///
1607 /// If the room was an invite that should be marked as a DM, will include it
1608 /// in the DM event after creating the joined room.
1609 ///
1610 /// If encrypted history sharing is enabled, will check to see if we have a
1611 /// key bundle, and import it if so.
1612 ///
1613 /// # Arguments
1614 ///
1615 /// * `room_id` - The `RoomId` of the room that was joined.
1616 /// * `pre_join_room_info` - Information about the room before we joined.
1617 async fn finish_join_room(
1618 &self,
1619 room_id: &RoomId,
1620 pre_join_room_info: Option<PreJoinRoomInfo>,
1621 ) -> Result<Room> {
1622 info!(?room_id, ?pre_join_room_info, "Completing room join");
1623 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1624 room.state() == RoomState::Invited
1625 && room.is_direct().await.unwrap_or_else(|e| {
1626 warn!(%room_id, "is_direct() failed: {e}");
1627 false
1628 })
1629 } else {
1630 false
1631 };
1632
1633 let base_room = self
1634 .base_client()
1635 .room_joined(
1636 room_id,
1637 pre_join_room_info
1638 .as_ref()
1639 .and_then(|info| info.inviter.as_ref())
1640 .map(|i| i.user_id().to_owned()),
1641 )
1642 .await?;
1643 let room = Room::new(self.clone(), base_room);
1644
1645 if mark_as_dm {
1646 room.set_is_direct(true).await?;
1647 }
1648
1649 // If we joined following an invite, check if we had previously received a key
1650 // bundle from the inviter, and import it if so.
1651 //
1652 // It's important that we only do this once `BaseClient::room_joined` has
1653 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1654 // race.
1655 #[cfg(feature = "e2e-encryption")]
1656 if self.inner.enable_share_history_on_invite
1657 && let Some(inviter) =
1658 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1659 {
1660 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1661 .await?;
1662 }
1663
1664 // Suppress "unused variable" and "unused field" lints
1665 #[cfg(not(feature = "e2e-encryption"))]
1666 let _ = pre_join_room_info.map(|i| i.inviter);
1667
1668 Ok(room)
1669 }
1670
1671 /// Join a room by `RoomId`.
1672 ///
1673 /// Returns the `Room` in the joined state.
1674 ///
1675 /// # Arguments
1676 ///
1677 /// * `room_id` - The `RoomId` of the room to be joined.
1678 #[instrument(skip(self))]
1679 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1680 // See who invited us to this room, if anyone. Note we have to do this before
1681 // making the `/join` request, otherwise we could race against the sync.
1682 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1683
1684 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1685 let response = self.send(request).await?;
1686 self.finish_join_room(&response.room_id, pre_join_info).await
1687 }
1688
1689 /// Join a room by `RoomOrAliasId`.
1690 ///
1691 /// Returns the `Room` in the joined state.
1692 ///
1693 /// # Arguments
1694 ///
1695 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1696 /// alias looks like `#name:example.com`.
1697 /// * `server_names` - The server names to be used for resolving the alias,
1698 /// if needs be.
1699 #[instrument(skip(self))]
1700 pub async fn join_room_by_id_or_alias(
1701 &self,
1702 alias: &RoomOrAliasId,
1703 server_names: &[OwnedServerName],
1704 ) -> Result<Room> {
1705 let pre_join_info = {
1706 match alias.try_into() {
1707 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1708 Err(_) => {
1709 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1710 // responding to an invitation to the room, and therefore don't need to handle
1711 // things that happen as a result of invites.
1712 None
1713 }
1714 }
1715 };
1716 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1717 via: server_names.to_owned(),
1718 });
1719 let response = self.send(request).await?;
1720 self.finish_join_room(&response.room_id, pre_join_info).await
1721 }
1722
1723 /// Search the homeserver's directory of public rooms.
1724 ///
1725 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1726 /// a `get_public_rooms::Response`.
1727 ///
1728 /// # Arguments
1729 ///
1730 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1731 ///
1732 /// * `since` - Pagination token from a previous request.
1733 ///
1734 /// * `server` - The name of the server, if `None` the requested server is
1735 /// used.
1736 ///
1737 /// # Examples
1738 /// ```no_run
1739 /// use matrix_sdk::Client;
1740 /// # use url::Url;
1741 /// # let homeserver = Url::parse("http://example.com").unwrap();
1742 /// # let limit = Some(10);
1743 /// # let since = Some("since token");
1744 /// # let server = Some("servername.com".try_into().unwrap());
1745 /// # async {
1746 /// let mut client = Client::new(homeserver).await.unwrap();
1747 ///
1748 /// client.public_rooms(limit, since, server).await;
1749 /// # };
1750 /// ```
1751 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1752 pub async fn public_rooms(
1753 &self,
1754 limit: Option<u32>,
1755 since: Option<&str>,
1756 server: Option<&ServerName>,
1757 ) -> HttpResult<get_public_rooms::v3::Response> {
1758 let limit = limit.map(UInt::from);
1759
1760 let request = assign!(get_public_rooms::v3::Request::new(), {
1761 limit,
1762 since: since.map(ToOwned::to_owned),
1763 server: server.map(ToOwned::to_owned),
1764 });
1765 self.send(request).await
1766 }
1767
1768 /// Create a room with the given parameters.
1769 ///
1770 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1771 /// created room.
1772 ///
1773 /// If you want to create a direct message with one specific user, you can
1774 /// use [`create_dm`][Self::create_dm], which is more convenient than
1775 /// assembling the [`create_room::v3::Request`] yourself.
1776 ///
1777 /// If the `is_direct` field of the request is set to `true` and at least
1778 /// one user is invited, the room will be automatically added to the direct
1779 /// rooms in the account data.
1780 ///
1781 /// # Examples
1782 ///
1783 /// ```no_run
1784 /// use matrix_sdk::{
1785 /// Client,
1786 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1787 /// };
1788 /// # use url::Url;
1789 /// #
1790 /// # async {
1791 /// # let homeserver = Url::parse("http://example.com").unwrap();
1792 /// let request = CreateRoomRequest::new();
1793 /// let client = Client::new(homeserver).await.unwrap();
1794 /// assert!(client.create_room(request).await.is_ok());
1795 /// # };
1796 /// ```
1797 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1798 let invite = request.invite.clone();
1799 let is_direct_room = request.is_direct;
1800 let response = self.send(request).await?;
1801
1802 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1803
1804 let joined_room = Room::new(self.clone(), base_room);
1805
1806 if is_direct_room
1807 && !invite.is_empty()
1808 && let Err(error) =
1809 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1810 {
1811 // FIXME: Retry in the background
1812 error!("Failed to mark room as DM: {error}");
1813 }
1814
1815 Ok(joined_room)
1816 }
1817
1818 /// Create a DM room.
1819 ///
1820 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1821 /// given user being invited, the room marked `is_direct` and both the
1822 /// creator and invitee getting the default maximum power level.
1823 ///
1824 /// If the `e2e-encryption` feature is enabled, the room will also be
1825 /// encrypted.
1826 ///
1827 /// # Arguments
1828 ///
1829 /// * `user_id` - The ID of the user to create a DM for.
1830 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1831 #[cfg(feature = "e2e-encryption")]
1832 let initial_state = vec![
1833 InitialStateEvent::with_empty_state_key(
1834 RoomEncryptionEventContent::with_recommended_defaults(),
1835 )
1836 .to_raw_any(),
1837 ];
1838
1839 #[cfg(not(feature = "e2e-encryption"))]
1840 let initial_state = vec![];
1841
1842 let request = assign!(create_room::v3::Request::new(), {
1843 invite: vec![user_id.to_owned()],
1844 is_direct: true,
1845 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1846 initial_state,
1847 });
1848
1849 self.create_room(request).await
1850 }
1851
1852 /// Get the first existing DM room with the given user, if any.
1853 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1854 self.get_dm_rooms(user_id).next()
1855 }
1856
1857 /// Get an iterator with the existing DM rooms for the given user.
1858 pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1859 let rooms = self.joined_rooms();
1860
1861 let dm_definition = &self.base_client().dm_room_definition;
1862
1863 // Find the room we share with the `user_id` and only with `user_id`
1864 let rooms = rooms.into_iter().filter(move |r| {
1865 let targets = r.direct_targets();
1866 let targets_match =
1867 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1868 match dm_definition {
1869 DmRoomDefinition::MatrixSpec => targets_match,
1870 DmRoomDefinition::TwoMembers => {
1871 let service_members_count =
1872 r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1873 let active_non_service_members =
1874 r.active_members_count().saturating_sub(service_members_count);
1875 targets_match && active_non_service_members <= 2
1876 }
1877 }
1878 });
1879
1880 trace!(?user_id, ?rooms, "Found DM rooms with user");
1881 rooms
1882 }
1883
1884 /// Search the homeserver's directory for public rooms with a filter.
1885 ///
1886 /// # Arguments
1887 ///
1888 /// * `room_search` - The easiest way to create this request is using the
1889 /// `get_public_rooms_filtered::Request` itself.
1890 ///
1891 /// # Examples
1892 ///
1893 /// ```no_run
1894 /// # use url::Url;
1895 /// # use matrix_sdk::Client;
1896 /// # async {
1897 /// # let homeserver = Url::parse("http://example.com")?;
1898 /// use matrix_sdk::ruma::{
1899 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1900 /// };
1901 /// # let mut client = Client::new(homeserver).await?;
1902 ///
1903 /// let mut filter = Filter::new();
1904 /// filter.generic_search_term = Some("rust".to_owned());
1905 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1906 /// request.filter = filter;
1907 ///
1908 /// let response = client.public_rooms_filtered(request).await?;
1909 ///
1910 /// for room in response.chunk {
1911 /// println!("Found room {room:?}");
1912 /// }
1913 /// # anyhow::Ok(()) };
1914 /// ```
1915 pub async fn public_rooms_filtered(
1916 &self,
1917 request: get_public_rooms_filtered::v3::Request,
1918 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1919 self.send(request).await
1920 }
1921
1922 /// Send an arbitrary request to the server, without updating client state.
1923 ///
1924 /// **Warning:** Because this method *does not* update the client state, it
1925 /// is important to make sure that you account for this yourself, and
1926 /// use wrapper methods where available. This method should *only* be
1927 /// used if a wrapper method for the endpoint you'd like to use is not
1928 /// available.
1929 ///
1930 /// # Arguments
1931 ///
1932 /// * `request` - A filled out and valid request for the endpoint to be hit
1933 ///
1934 /// * `timeout` - An optional request timeout setting, this overrides the
1935 /// default request setting if one was set.
1936 ///
1937 /// # Examples
1938 ///
1939 /// ```no_run
1940 /// # use matrix_sdk::{Client, config::SyncSettings};
1941 /// # use url::Url;
1942 /// # async {
1943 /// # let homeserver = Url::parse("http://localhost:8080")?;
1944 /// # let mut client = Client::new(homeserver).await?;
1945 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1946 ///
1947 /// // First construct the request you want to make
1948 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1949 /// // for all available Endpoints
1950 /// let user_id = owned_user_id!("@example:localhost");
1951 /// let request = profile::get_profile::v3::Request::new(user_id);
1952 ///
1953 /// // Start the request using Client::send()
1954 /// let response = client.send(request).await?;
1955 ///
1956 /// // Check the corresponding Response struct to find out what types are
1957 /// // returned
1958 /// # anyhow::Ok(()) };
1959 /// ```
1960 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1961 where
1962 Request: OutgoingRequest + Clone + Debug,
1963 Request::Authentication: SupportedAuthScheme,
1964 Request::PathBuilder: SupportedPathBuilder,
1965 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1966 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1967 {
1968 SendRequest {
1969 client: self.clone(),
1970 request,
1971 config: None,
1972 send_progress: Default::default(),
1973 }
1974 }
1975
1976 pub(crate) async fn send_inner<Request>(
1977 &self,
1978 request: Request,
1979 config: Option<RequestConfig>,
1980 send_progress: SharedObservable<TransmissionProgress>,
1981 ) -> HttpResult<Request::IncomingResponse>
1982 where
1983 Request: OutgoingRequest + Debug,
1984 Request::Authentication: SupportedAuthScheme,
1985 Request::PathBuilder: SupportedPathBuilder,
1986 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1987 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1988 {
1989 let homeserver = self.homeserver().to_string();
1990 let access_token = self.access_token();
1991 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1992
1993 let path_builder_input =
1994 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1995
1996 let result = self
1997 .inner
1998 .http_client
1999 .send(
2000 request,
2001 config,
2002 homeserver,
2003 access_token.as_deref(),
2004 path_builder_input,
2005 send_progress,
2006 )
2007 .await;
2008
2009 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2010 result.as_ref().map_err(HttpError::client_api_error_kind)
2011 && let Some(access_token) = &access_token
2012 {
2013 // Mark the access token as expired.
2014 self.auth_ctx().set_access_token_expired(access_token);
2015 }
2016
2017 result
2018 }
2019
2020 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2021 _ = self
2022 .inner
2023 .auth_ctx
2024 .session_change_sender
2025 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2026 }
2027
2028 /// Fetches server versions from network; no caching.
2029 pub async fn fetch_server_versions(
2030 &self,
2031 request_config: Option<RequestConfig>,
2032 ) -> HttpResult<get_supported_versions::Response> {
2033 // Since this was called by the user, try to refresh the access token if
2034 // necessary.
2035 self.fetch_server_versions_inner(false, request_config).await
2036 }
2037
2038 /// Fetches server versions from network; no caching.
2039 ///
2040 /// If the access token is expired and `failsafe` is `false`, this will
2041 /// attempt to refresh the access token, otherwise this will try to make an
2042 /// unauthenticated request instead.
2043 pub(crate) async fn fetch_server_versions_inner(
2044 &self,
2045 failsafe: bool,
2046 request_config: Option<RequestConfig>,
2047 ) -> HttpResult<get_supported_versions::Response> {
2048 if !failsafe {
2049 // `Client::send()` handles refreshing access tokens.
2050 return self
2051 .send(get_supported_versions::Request::new())
2052 .with_request_config(request_config)
2053 .await;
2054 }
2055
2056 let homeserver = self.homeserver().to_string();
2057
2058 // If we have a fresh access token, try with it first.
2059 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2060 && self.auth_ctx().has_valid_access_token()
2061 && let Some(access_token) = self.access_token()
2062 {
2063 let result = self
2064 .inner
2065 .http_client
2066 .send(
2067 get_supported_versions::Request::new(),
2068 request_config,
2069 homeserver.clone(),
2070 Some(&access_token),
2071 (),
2072 Default::default(),
2073 )
2074 .await;
2075
2076 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2077 result.as_ref().map_err(HttpError::client_api_error_kind)
2078 {
2079 // If the access token is actually expired, mark it as expired and fallback to
2080 // the unauthenticated request below.
2081 self.auth_ctx().set_access_token_expired(&access_token);
2082 } else {
2083 // If the request succeeded or it's an other error, just stop now.
2084 return result;
2085 }
2086 }
2087
2088 // Try without authentication.
2089 self.inner
2090 .http_client
2091 .send(
2092 get_supported_versions::Request::new(),
2093 request_config,
2094 homeserver.clone(),
2095 None,
2096 (),
2097 Default::default(),
2098 )
2099 .await
2100 }
2101
2102 /// Fetches client well_known from network; no caching.
2103 ///
2104 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2105 /// well-known contents.
2106 /// 2. If it's not, we try extracting the server name from the
2107 /// [`Client::user_id`] and building the server URL from it.
2108 /// 3. If we couldn't get the well-known contents with either the explicit
2109 /// server name or the implicit extracted one, we try the homeserver URL
2110 /// as a last resort.
2111 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2112 let homeserver = self.homeserver();
2113 let scheme = homeserver.scheme();
2114
2115 // Use the server name, either an explicit one or an implicit one taken from
2116 // the user id: sometimes we'll have only the homeserver url available and no
2117 // server name, but the server name can be extracted from the current user id.
2118 let server_url = self
2119 .server()
2120 .map(|server| server.to_string())
2121 // If the server name wasn't available, extract it from the user id and build a URL:
2122 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2123 // will be the same for the public server url, lacking a better candidate.
2124 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2125
2126 // If the server name is available, first try using it
2127 let response = if let Some(server_url) = server_url {
2128 // First try using the server name
2129 self.fetch_client_well_known_with_url(server_url).await
2130 } else {
2131 None
2132 };
2133
2134 // If we didn't get a well-known value yet, try with the homeserver url instead:
2135 if response.is_none() {
2136 // Sometimes people configure their well-known directly on the homeserver so use
2137 // this as a fallback when the server name is unknown.
2138 warn!(
2139 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2140 );
2141 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2142 } else {
2143 response
2144 }
2145 }
2146
2147 async fn fetch_client_well_known_with_url(
2148 &self,
2149 url: String,
2150 ) -> Option<discover_homeserver::Response> {
2151 let well_known = self
2152 .inner
2153 .http_client
2154 .send(
2155 discover_homeserver::Request::new(),
2156 Some(RequestConfig::short_retry()),
2157 url,
2158 None,
2159 (),
2160 Default::default(),
2161 )
2162 .await;
2163
2164 match well_known {
2165 Ok(well_known) => Some(well_known),
2166 Err(http_error) => {
2167 // It is perfectly valid to not have a well-known file.
2168 // Maybe we should check for a specific error code to be sure?
2169 warn!("Failed to fetch client well-known: {http_error}");
2170 None
2171 }
2172 }
2173 }
2174
2175 /// Load supported versions from storage, or fetch them from network and
2176 /// cache them.
2177 ///
2178 /// If `failsafe` is true, this will try to minimize side effects to avoid
2179 /// possible deadlocks.
2180 async fn fetch_supported_versions(
2181 &self,
2182 failsafe: bool,
2183 ) -> HttpResult<SupportedVersionsResponse> {
2184 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2185 let supported_versions = SupportedVersionsResponse {
2186 versions: server_versions.versions,
2187 unstable_features: server_versions.unstable_features,
2188 };
2189
2190 Ok(supported_versions)
2191 }
2192
2193 /// Get the Matrix versions and features supported by the homeserver by
2194 /// fetching them from the server or the cache.
2195 ///
2196 /// This is equivalent to calling both [`Client::server_versions()`] and
2197 /// [`Client::unstable_features()`]. To always fetch the result from the
2198 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2199 /// and then `.as_supported_versions()` on the response.
2200 ///
2201 /// # Examples
2202 ///
2203 /// ```no_run
2204 /// use ruma::api::{FeatureFlag, MatrixVersion};
2205 /// # use matrix_sdk::{Client, config::SyncSettings};
2206 /// # use url::Url;
2207 /// # async {
2208 /// # let homeserver = Url::parse("http://localhost:8080")?;
2209 /// # let mut client = Client::new(homeserver).await?;
2210 ///
2211 /// let supported = client.supported_versions().await?;
2212 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2213 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2214 ///
2215 /// let msc_x_feature = FeatureFlag::from("msc_x");
2216 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2217 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2218 /// # anyhow::Ok(()) };
2219 /// ```
2220 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2221 self.supported_versions_inner(false).await
2222 }
2223
2224 /// Get the Matrix versions and features supported by the homeserver by
2225 /// fetching them from the server or the cache.
2226 ///
2227 /// If `failsafe` is true, this will try to minimize side effects to avoid
2228 /// possible deadlocks.
2229 pub(crate) async fn supported_versions_inner(
2230 &self,
2231 failsafe: bool,
2232 ) -> HttpResult<SupportedVersions> {
2233 match self.supported_versions_cached_inner(failsafe).await {
2234 Ok(Some(value)) => {
2235 return Ok(value);
2236 }
2237 Ok(None) => {
2238 // The cache is empty, make a request.
2239 }
2240 Err(error) => {
2241 warn!("error when loading cached supported versions: {error}");
2242 // Fallthrough to make a request.
2243 }
2244 }
2245
2246 self.refresh_supported_versions_cache(failsafe).await
2247 }
2248
2249 /// Refresh the Matrix versions and features supported by the homeserver in
2250 /// the cache.
2251 ///
2252 /// If `failsafe` is true, this will try to minimize side effects to avoid
2253 /// possible deadlocks.
2254 async fn refresh_supported_versions_cache(
2255 &self,
2256 failsafe: bool,
2257 ) -> HttpResult<SupportedVersions> {
2258 let cached_supported_versions = &self.inner.caches.supported_versions;
2259
2260 let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2261 Ok(guard) => guard,
2262 Err(_) => {
2263 // There is already a refresh in progress, wait for it to finish.
2264 let guard = cached_supported_versions.refresh_lock.lock().await;
2265
2266 if let Err(error) = guard.as_ref() {
2267 // There was an error in the previous refresh, return it.
2268 return Err(HttpError::Cached(error.clone()));
2269 }
2270
2271 // Reuse the data if it was cached and it hasn't expired.
2272 if let CachedValue::Cached(value) = cached_supported_versions.value()
2273 && !value.has_expired()
2274 {
2275 return Ok(value.into_data());
2276 }
2277
2278 // The data wasn't cached or has expired, we need to make another request.
2279 guard
2280 }
2281 };
2282
2283 let response = match self.fetch_supported_versions(failsafe).await {
2284 Ok(response) => {
2285 *supported_versions_guard = Ok(());
2286 TtlValue::new(response)
2287 }
2288 Err(error) => {
2289 let error = Arc::new(error);
2290 *supported_versions_guard = Err(error.clone());
2291 return Err(HttpError::Cached(error));
2292 }
2293 };
2294
2295 let supported_versions = response.as_ref().map(|response| response.supported_versions());
2296
2297 // Only cache the result if the request was authenticated.
2298 if self.auth_ctx().has_valid_access_token() {
2299 if let Err(err) = self
2300 .state_store()
2301 .set_kv_data(
2302 StateStoreDataKey::SupportedVersions,
2303 StateStoreDataValue::SupportedVersions(response),
2304 )
2305 .await
2306 {
2307 warn!("error when caching supported versions: {err}");
2308 }
2309
2310 cached_supported_versions.set_value(supported_versions.clone());
2311 }
2312
2313 Ok(supported_versions.into_data())
2314 }
2315
2316 /// Get the Matrix versions and features supported by the homeserver by
2317 /// fetching them from the cache.
2318 ///
2319 /// For a version of this function that fetches the supported versions and
2320 /// features from the homeserver if the [`SupportedVersions`] aren't
2321 /// found in the cache, take a look at the [`Client::supported_versions()`]
2322 /// method.
2323 ///
2324 /// If the data in the cache has expired, this will trigger a background
2325 /// task to refresh it.
2326 ///
2327 /// # Examples
2328 ///
2329 /// ```no_run
2330 /// use ruma::api::{FeatureFlag, MatrixVersion};
2331 /// # use matrix_sdk::{Client, config::SyncSettings};
2332 /// # use url::Url;
2333 /// # async {
2334 /// # let homeserver = Url::parse("http://localhost:8080")?;
2335 /// # let mut client = Client::new(homeserver).await?;
2336 ///
2337 /// let supported =
2338 /// if let Some(supported) = client.supported_versions_cached().await? {
2339 /// supported
2340 /// } else {
2341 /// client.fetch_server_versions(None).await?.as_supported_versions()
2342 /// };
2343 ///
2344 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2345 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2346 ///
2347 /// let msc_x_feature = FeatureFlag::from("msc_x");
2348 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2349 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2350 /// # anyhow::Ok(()) };
2351 /// ```
2352 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2353 self.supported_versions_cached_inner(false).await
2354 }
2355
2356 async fn supported_versions_cached_inner(
2357 &self,
2358 failsafe: bool,
2359 ) -> Result<Option<SupportedVersions>, StoreError> {
2360 let supported_versions_cache = &self.inner.caches.supported_versions;
2361
2362 let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2363 cached
2364 } else if let Some(stored) = self
2365 .state_store()
2366 .get_kv_data(StateStoreDataKey::SupportedVersions)
2367 .await?
2368 .and_then(|value| value.into_supported_versions())
2369 {
2370 let stored = stored.map(|response| response.supported_versions());
2371
2372 // Copy the data from the store in the in-memory cache.
2373 supported_versions_cache.set_value(stored.clone());
2374
2375 stored
2376 } else {
2377 return Ok(None);
2378 };
2379
2380 // Spawn a task to refresh the cache if it has expired and we have a valid
2381 // access token.
2382 if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2383 debug!("spawning task to refresh supported versions cache");
2384
2385 let client = self.clone();
2386 self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2387 if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2388 warn!("failed to refresh supported versions cache: {error}");
2389 }
2390 });
2391 }
2392
2393 Ok(Some(value.into_data()))
2394 }
2395
2396 /// Get the Matrix versions supported by the homeserver by fetching them
2397 /// from the server or the cache.
2398 ///
2399 /// # Examples
2400 ///
2401 /// ```no_run
2402 /// use ruma::api::MatrixVersion;
2403 /// # use matrix_sdk::{Client, config::SyncSettings};
2404 /// # use url::Url;
2405 /// # async {
2406 /// # let homeserver = Url::parse("http://localhost:8080")?;
2407 /// # let mut client = Client::new(homeserver).await?;
2408 ///
2409 /// let server_versions = client.server_versions().await?;
2410 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2411 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2412 /// # anyhow::Ok(()) };
2413 /// ```
2414 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2415 Ok(self.supported_versions().await?.versions)
2416 }
2417
2418 /// Get the unstable features supported by the homeserver by fetching them
2419 /// from the server or the cache.
2420 ///
2421 /// # Examples
2422 ///
2423 /// ```no_run
2424 /// use matrix_sdk::ruma::api::FeatureFlag;
2425 /// # use matrix_sdk::{Client, config::SyncSettings};
2426 /// # use url::Url;
2427 /// # async {
2428 /// # let homeserver = Url::parse("http://localhost:8080")?;
2429 /// # let mut client = Client::new(homeserver).await?;
2430 ///
2431 /// let msc_x_feature = FeatureFlag::from("msc_x");
2432 /// let unstable_features = client.unstable_features().await?;
2433 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2434 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2435 /// # anyhow::Ok(()) };
2436 /// ```
2437 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2438 Ok(self.supported_versions().await?.features)
2439 }
2440
2441 /// Empty the supported versions and unstable features cache.
2442 ///
2443 /// Since the SDK caches the supported versions, it's possible to have a
2444 /// stale entry in the cache. This functions makes it possible to force
2445 /// reset it.
2446 pub async fn reset_supported_versions(&self) -> Result<()> {
2447 // Empty the in-memory cache.
2448 self.inner.caches.supported_versions.reset();
2449
2450 // Empty the store cache.
2451 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2452 }
2453
2454 /// Get the well-known file of the homeserver from the cache.
2455 ///
2456 /// If the data in the cache has expired, this will trigger a background
2457 /// task to refresh it.
2458 async fn well_known_cached(
2459 &self,
2460 ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2461 let well_known_cache = &self.inner.caches.well_known;
2462
2463 let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2464 cached
2465 } else if let Some(stored) = self
2466 .state_store()
2467 .get_kv_data(StateStoreDataKey::WellKnown)
2468 .await?
2469 .and_then(|value| value.into_well_known())
2470 {
2471 // Copy the data from the store into the in-memory cache.
2472 well_known_cache.set_value(stored.clone());
2473
2474 stored
2475 } else {
2476 return Ok(CachedValue::NotSet);
2477 };
2478
2479 // Spawn a task to refresh the cache if it has expired.
2480 if value.has_expired() {
2481 debug!("spawning task to refresh well-known cache");
2482
2483 let client = self.clone();
2484 self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2485 client.refresh_well_known_cache().await;
2486 });
2487 }
2488
2489 Ok(CachedValue::Cached(value.into_data()))
2490 }
2491
2492 /// Refresh the well-known file of the homeserver in the cache.
2493 async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2494 let well_known_cache = &self.inner.caches.well_known;
2495
2496 let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2497 Ok(guard) => guard,
2498 Err(_) => {
2499 // There is already a refresh in progress, wait for it to finish.
2500 let guard = well_known_cache.refresh_lock.lock().await;
2501
2502 // A refresh can't fail because we ignore failures, so there shouldn't be an
2503 // error in the refresh lock.
2504
2505 // Reuse the data if it was cached and it hasn't expired.
2506 if let CachedValue::Cached(value) = well_known_cache.value()
2507 && !value.has_expired()
2508 {
2509 return value.into_data();
2510 }
2511
2512 // The data wasn't cached or has expired, we need to make another request.
2513 guard
2514 }
2515 };
2516
2517 let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2518
2519 if let Err(err) = self
2520 .state_store()
2521 .set_kv_data(
2522 StateStoreDataKey::WellKnown,
2523 StateStoreDataValue::WellKnown(well_known.clone()),
2524 )
2525 .await
2526 {
2527 warn!("error when caching well-known: {err}");
2528 }
2529
2530 well_known_cache.set_value(well_known.clone());
2531
2532 well_known.into_data()
2533 }
2534
2535 /// Get the well-known file of the homeserver by fetching it from the server
2536 /// or the cache.
2537 async fn well_known(&self) -> Option<WellKnownResponse> {
2538 match self.well_known_cached().await {
2539 Ok(CachedValue::Cached(value)) => {
2540 return value;
2541 }
2542 Ok(CachedValue::NotSet) => {
2543 // The cache is empty, make a request.
2544 }
2545 Err(error) => {
2546 warn!("error when loading cached well-known: {error}");
2547 // Fallthrough to make a request.
2548 }
2549 }
2550
2551 self.refresh_well_known_cache().await
2552 }
2553
2554 /// Get information about the homeserver's advertised RTC foci by fetching
2555 /// the well-known file from the server or the cache.
2556 ///
2557 /// # Examples
2558 /// ```no_run
2559 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::rtc::RtcTransport};
2560 /// # use url::Url;
2561 /// # async {
2562 /// # let homeserver = Url::parse("http://localhost:8080")?;
2563 /// # let mut client = Client::new(homeserver).await?;
2564 /// let rtc_foci = client.rtc_foci().await?;
2565 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2566 /// RtcTransport::LiveKit(info) => Some(info),
2567 /// _ => None,
2568 /// });
2569 /// if let Some(info) = default_livekit_focus_info {
2570 /// println!("Default LiveKit service URL: {}", info.service_url);
2571 /// }
2572 /// # anyhow::Ok(()) };
2573 /// ```
2574 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcTransport>> {
2575 let well_known = self.well_known().await;
2576
2577 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2578 }
2579
2580 /// Get information about the homeserver's advertised map tile server, if
2581 /// any, by fetching the well-known file from the server or the cache.
2582 ///
2583 /// Returns `None` if the homeserver has not advertised a tile server in its
2584 /// well-known, or if the well-known is otherwise unavailable.
2585 pub async fn tile_server(&self) -> Option<TileServerInfo> {
2586 self.well_known().await.and_then(|well_known| well_known.tile_server).map(Into::into)
2587 }
2588
2589 /// Empty the well-known cache.
2590 ///
2591 /// Since the SDK caches the well-known, it's possible to have a stale entry
2592 /// in the cache. This functions makes it possible to force reset it.
2593 pub async fn reset_well_known(&self) -> Result<()> {
2594 // Empty the in-memory caches.
2595 self.inner.caches.well_known.reset();
2596
2597 // Empty the store cache.
2598 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2599 }
2600
2601 /// Check whether MSC 4028 is enabled on the homeserver.
2602 ///
2603 /// # Examples
2604 ///
2605 /// ```no_run
2606 /// # use matrix_sdk::{Client, config::SyncSettings};
2607 /// # use url::Url;
2608 /// # async {
2609 /// # let homeserver = Url::parse("http://localhost:8080")?;
2610 /// # let mut client = Client::new(homeserver).await?;
2611 /// let msc4028_enabled =
2612 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2613 /// # anyhow::Ok(()) };
2614 /// ```
2615 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2616 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2617 }
2618
2619 /// Get information of all our own devices.
2620 ///
2621 /// # Examples
2622 ///
2623 /// ```no_run
2624 /// # use matrix_sdk::{Client, config::SyncSettings};
2625 /// # use url::Url;
2626 /// # async {
2627 /// # let homeserver = Url::parse("http://localhost:8080")?;
2628 /// # let mut client = Client::new(homeserver).await?;
2629 /// let response = client.devices().await?;
2630 ///
2631 /// for device in response.devices {
2632 /// println!(
2633 /// "Device: {} {}",
2634 /// device.device_id,
2635 /// device.display_name.as_deref().unwrap_or("")
2636 /// );
2637 /// }
2638 /// # anyhow::Ok(()) };
2639 /// ```
2640 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2641 let request = get_devices::v3::Request::new();
2642
2643 self.send(request).await
2644 }
2645
2646 /// Delete the given devices from the server.
2647 ///
2648 /// # Arguments
2649 ///
2650 /// * `devices` - The list of devices that should be deleted from the
2651 /// server.
2652 ///
2653 /// * `auth_data` - This request requires user interactive auth, the first
2654 /// request needs to set this to `None` and will always fail with an
2655 /// `UiaaResponse`. The response will contain information for the
2656 /// interactive auth and the same request needs to be made but this time
2657 /// with some `auth_data` provided.
2658 ///
2659 /// ```no_run
2660 /// # use matrix_sdk::{
2661 /// # ruma::{api::client::uiaa, owned_device_id},
2662 /// # Client, Error, config::SyncSettings,
2663 /// # };
2664 /// # use serde_json::json;
2665 /// # use url::Url;
2666 /// # use std::collections::BTreeMap;
2667 /// # async {
2668 /// # let homeserver = Url::parse("http://localhost:8080")?;
2669 /// # let mut client = Client::new(homeserver).await?;
2670 /// let devices = &[owned_device_id!("DEVICEID")];
2671 ///
2672 /// if let Err(e) = client.delete_devices(devices, None).await {
2673 /// if let Some(info) = e.as_uiaa_response() {
2674 /// let mut password = uiaa::Password::new(
2675 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2676 /// "wordpass".to_owned(),
2677 /// );
2678 /// password.session = info.session.clone();
2679 ///
2680 /// client
2681 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2682 /// .await?;
2683 /// }
2684 /// }
2685 /// # anyhow::Ok(()) };
2686 pub async fn delete_devices(
2687 &self,
2688 devices: &[OwnedDeviceId],
2689 auth_data: Option<uiaa::AuthData>,
2690 ) -> HttpResult<delete_devices::v3::Response> {
2691 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2692 request.auth = auth_data;
2693
2694 self.send(request).await
2695 }
2696
2697 /// Change the display name of a device owned by the current user.
2698 ///
2699 /// Returns a `update_device::Response` which specifies the result
2700 /// of the operation.
2701 ///
2702 /// # Arguments
2703 ///
2704 /// * `device_id` - The ID of the device to change the display name of.
2705 /// * `display_name` - The new display name to set.
2706 pub async fn rename_device(
2707 &self,
2708 device_id: &DeviceId,
2709 display_name: &str,
2710 ) -> HttpResult<update_device::v3::Response> {
2711 let mut request = update_device::v3::Request::new(device_id.to_owned());
2712 request.display_name = Some(display_name.to_owned());
2713
2714 self.send(request).await
2715 }
2716
2717 /// Check whether a device with a specific ID exists on the server.
2718 ///
2719 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2720 /// with 404 and the underlying error otherwise.
2721 ///
2722 /// # Arguments
2723 ///
2724 /// * `device_id` - The ID of the device to query.
2725 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2726 let request = device::get_device::v3::Request::new(device_id);
2727 match self.send(request).await {
2728 Ok(_) => Ok(true),
2729 Err(err) => {
2730 if let Some(error) = err.as_client_api_error()
2731 && error.status_code == 404
2732 {
2733 Ok(false)
2734 } else {
2735 Err(err.into())
2736 }
2737 }
2738 }
2739 }
2740
2741 /// Synchronize the client's state with the latest state on the server.
2742 ///
2743 /// ## Syncing Events
2744 ///
2745 /// Messages or any other type of event need to be periodically fetched from
2746 /// the server, this is achieved by sending a `/sync` request to the server.
2747 ///
2748 /// The first sync is sent out without a [`token`]. The response of the
2749 /// first sync will contain a [`next_batch`] field which should then be
2750 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2751 /// don't receive the same events multiple times.
2752 ///
2753 /// ## Long Polling
2754 ///
2755 /// A sync should in the usual case always be in flight. The
2756 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2757 /// long the server will wait for new events before it will respond.
2758 /// The server will respond immediately if some new events arrive before the
2759 /// timeout has expired. If no changes arrive and the timeout expires an
2760 /// empty sync response will be sent to the client.
2761 ///
2762 /// This method of sending a request that may not receive a response
2763 /// immediately is called long polling.
2764 ///
2765 /// ## Filtering Events
2766 ///
2767 /// The number or type of messages and events that the client should receive
2768 /// from the server can be altered using a [`Filter`].
2769 ///
2770 /// Filters can be non-trivial and, since they will be sent with every sync
2771 /// request, they may take up a bunch of unnecessary bandwidth.
2772 ///
2773 /// Luckily filters can be uploaded to the server and reused using an unique
2774 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2775 /// method.
2776 ///
2777 /// # Arguments
2778 ///
2779 /// * `sync_settings` - Settings for the sync call, this allows us to set
2780 /// various options to configure the sync:
2781 /// * [`filter`] - To configure which events we receive and which get
2782 /// [filtered] by the server
2783 /// * [`timeout`] - To configure our [long polling] setup.
2784 /// * [`token`] - To tell the server which events we already received
2785 /// and where we wish to continue syncing.
2786 /// * [`full_state`] - To tell the server that we wish to receive all
2787 /// state events, regardless of our configured [`token`].
2788 /// * [`set_presence`] - To tell the server to set the presence and to
2789 /// which state.
2790 ///
2791 /// # Examples
2792 ///
2793 /// ```no_run
2794 /// # use url::Url;
2795 /// # async {
2796 /// # let homeserver = Url::parse("http://localhost:8080")?;
2797 /// # let username = "";
2798 /// # let password = "";
2799 /// use matrix_sdk::{
2800 /// Client, config::SyncSettings,
2801 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2802 /// };
2803 ///
2804 /// let client = Client::new(homeserver).await?;
2805 /// client.matrix_auth().login_username(username, password).send().await?;
2806 ///
2807 /// // Sync once so we receive the client state and old messages.
2808 /// client.sync_once(SyncSettings::default()).await?;
2809 ///
2810 /// // Register our handler so we start responding once we receive a new
2811 /// // event.
2812 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2813 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2814 /// });
2815 ///
2816 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2817 /// // from our `sync_once()` call automatically.
2818 /// client.sync(SyncSettings::default()).await;
2819 /// # anyhow::Ok(()) };
2820 /// ```
2821 ///
2822 /// [`sync`]: #method.sync
2823 /// [`SyncSettings`]: crate::config::SyncSettings
2824 /// [`token`]: crate::config::SyncSettings#method.token
2825 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2826 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2827 /// [`set_presence`]: ruma::presence::PresenceState
2828 /// [`filter`]: crate::config::SyncSettings#method.filter
2829 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2830 /// [`next_batch`]: SyncResponse#structfield.next_batch
2831 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2832 /// [long polling]: #long-polling
2833 /// [filtered]: #filtering-events
2834 #[instrument(skip(self))]
2835 pub async fn sync_once(
2836 &self,
2837 sync_settings: crate::config::SyncSettings,
2838 ) -> Result<SyncResponse> {
2839 // The sync might not return for quite a while due to the timeout.
2840 // We'll see if there's anything crypto related to send out before we
2841 // sync, i.e. if we closed our client after a sync but before the
2842 // crypto requests were sent out.
2843 //
2844 // This will mostly be a no-op.
2845 #[cfg(feature = "e2e-encryption")]
2846 if let Err(e) = self.send_outgoing_requests().await {
2847 error!(error = ?e, "Error while sending outgoing E2EE requests");
2848 }
2849
2850 let token = match sync_settings.token {
2851 SyncToken::Specific(token) => Some(token),
2852 SyncToken::NoToken => None,
2853 SyncToken::ReusePrevious => self.sync_token().await,
2854 };
2855
2856 let request = assign!(sync_events::v3::Request::new(), {
2857 filter: sync_settings.filter.map(|f| *f),
2858 since: token,
2859 full_state: sync_settings.full_state,
2860 set_presence: sync_settings.set_presence,
2861 timeout: sync_settings.timeout,
2862 use_state_after: true,
2863 });
2864 let mut request_config = self.request_config();
2865 if let Some(timeout) = sync_settings.timeout {
2866 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2867 request_config.timeout = Some(base_timeout + timeout);
2868 }
2869
2870 let response = self.send(request).with_request_config(request_config).await?;
2871 let next_batch = response.next_batch.clone();
2872 let response = self.process_sync(response).await?;
2873
2874 #[cfg(feature = "e2e-encryption")]
2875 if let Err(e) = self.send_outgoing_requests().await {
2876 error!(error = ?e, "Error while sending outgoing E2EE requests");
2877 }
2878
2879 self.inner.sync_beat.notify(usize::MAX);
2880
2881 Ok(SyncResponse::new(next_batch, response))
2882 }
2883
2884 /// Repeatedly synchronize the client state with the server.
2885 ///
2886 /// This method will only return on error, if cancellation is needed
2887 /// the method should be wrapped in a cancelable task or the
2888 /// [`Client::sync_with_callback`] method can be used or
2889 /// [`Client::sync_with_result_callback`] if you want to handle error
2890 /// cases in the loop, too.
2891 ///
2892 /// This method will internally call [`Client::sync_once`] in a loop.
2893 ///
2894 /// This method can be used with the [`Client::add_event_handler`]
2895 /// method to react to individual events. If you instead wish to handle
2896 /// events in a bulk manner the [`Client::sync_with_callback`],
2897 /// [`Client::sync_with_result_callback`] and
2898 /// [`Client::sync_stream`] methods can be used instead. Those methods
2899 /// repeatedly return the whole sync response.
2900 ///
2901 /// # Arguments
2902 ///
2903 /// * `sync_settings` - Settings for the sync call. *Note* that those
2904 /// settings will be only used for the first sync call. See the argument
2905 /// docs for [`Client::sync_once`] for more info.
2906 ///
2907 /// # Return
2908 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2909 /// up to the user of the API to check the error and decide whether the sync
2910 /// should continue or not.
2911 ///
2912 /// # Examples
2913 ///
2914 /// ```no_run
2915 /// # use url::Url;
2916 /// # async {
2917 /// # let homeserver = Url::parse("http://localhost:8080")?;
2918 /// # let username = "";
2919 /// # let password = "";
2920 /// use matrix_sdk::{
2921 /// Client, config::SyncSettings,
2922 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2923 /// };
2924 ///
2925 /// let client = Client::new(homeserver).await?;
2926 /// client.matrix_auth().login_username(&username, &password).send().await?;
2927 ///
2928 /// // Register our handler so we start responding once we receive a new
2929 /// // event.
2930 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2931 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2932 /// });
2933 ///
2934 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2935 /// // automatically.
2936 /// client.sync(SyncSettings::default()).await?;
2937 /// # anyhow::Ok(()) };
2938 /// ```
2939 ///
2940 /// [argument docs]: #method.sync_once
2941 /// [`sync_with_callback`]: #method.sync_with_callback
2942 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2943 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2944 }
2945
2946 /// Repeatedly call sync to synchronize the client state with the server.
2947 ///
2948 /// # Arguments
2949 ///
2950 /// * `sync_settings` - Settings for the sync call. *Note* that those
2951 /// settings will be only used for the first sync call. See the argument
2952 /// docs for [`Client::sync_once`] for more info.
2953 ///
2954 /// * `callback` - A callback that will be called every time a successful
2955 /// response has been fetched from the server. The callback must return a
2956 /// boolean which signalizes if the method should stop syncing. If the
2957 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2958 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2959 ///
2960 /// # Return
2961 /// The sync runs until an error occurs or the
2962 /// callback indicates that the Loop should stop. If the callback asked for
2963 /// a regular stop, the result will be `Ok(())` otherwise the
2964 /// `Err(Error)` is returned.
2965 ///
2966 /// # Examples
2967 ///
2968 /// The following example demonstrates how to sync forever while sending all
2969 /// the interesting events through a mpsc channel to another thread e.g. a
2970 /// UI thread.
2971 ///
2972 /// ```no_run
2973 /// # use std::time::Duration;
2974 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2975 /// # use url::Url;
2976 /// # async {
2977 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2978 /// # let mut client = Client::new(homeserver).await.unwrap();
2979 ///
2980 /// use tokio::sync::mpsc::channel;
2981 ///
2982 /// let (tx, rx) = channel(100);
2983 ///
2984 /// let sync_channel = &tx;
2985 /// let sync_settings = SyncSettings::new()
2986 /// .timeout(Duration::from_secs(30));
2987 ///
2988 /// client
2989 /// .sync_with_callback(sync_settings, |response| async move {
2990 /// let channel = sync_channel;
2991 /// for (room_id, room) in response.rooms.joined {
2992 /// for event in room.timeline.events {
2993 /// channel.send(event).await.unwrap();
2994 /// }
2995 /// }
2996 ///
2997 /// LoopCtrl::Continue
2998 /// })
2999 /// .await;
3000 /// };
3001 /// ```
3002 #[instrument(skip_all)]
3003 pub async fn sync_with_callback<C>(
3004 &self,
3005 sync_settings: crate::config::SyncSettings,
3006 callback: impl Fn(SyncResponse) -> C,
3007 ) -> Result<(), Error>
3008 where
3009 C: Future<Output = LoopCtrl>,
3010 {
3011 self.sync_with_result_callback(sync_settings, |result| async {
3012 Ok(callback(result?).await)
3013 })
3014 .await
3015 }
3016
3017 /// Repeatedly call sync to synchronize the client state with the server.
3018 ///
3019 /// # Arguments
3020 ///
3021 /// * `sync_settings` - Settings for the sync call. *Note* that those
3022 /// settings will be only used for the first sync call. See the argument
3023 /// docs for [`Client::sync_once`] for more info.
3024 ///
3025 /// * `callback` - A callback that will be called every time after a
3026 /// response has been received, failure or not. The callback returns a
3027 /// `Result<LoopCtrl, Error>`, too. When returning
3028 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3029 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3030 /// function returns `Ok(())`. In case the callback can't handle the
3031 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
3032 /// which results in the sync ending and the `Err(Error)` being returned.
3033 ///
3034 /// # Return
3035 /// The sync runs until an error occurs that the callback can't handle or
3036 /// the callback indicates that the Loop should stop. If the callback
3037 /// asked for a regular stop, the result will be `Ok(())` otherwise the
3038 /// `Err(Error)` is returned.
3039 ///
3040 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3041 /// this, and are handled first without sending the result to the
3042 /// callback. Only after they have exceeded is the `Result` handed to
3043 /// the callback.
3044 ///
3045 /// # Examples
3046 ///
3047 /// The following example demonstrates how to sync forever while sending all
3048 /// the interesting events through a mpsc channel to another thread e.g. a
3049 /// UI thread.
3050 ///
3051 /// ```no_run
3052 /// # use std::time::Duration;
3053 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3054 /// # use url::Url;
3055 /// # async {
3056 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3057 /// # let mut client = Client::new(homeserver).await.unwrap();
3058 /// #
3059 /// use tokio::sync::mpsc::channel;
3060 ///
3061 /// let (tx, rx) = channel(100);
3062 ///
3063 /// let sync_channel = &tx;
3064 /// let sync_settings = SyncSettings::new()
3065 /// .timeout(Duration::from_secs(30));
3066 ///
3067 /// client
3068 /// .sync_with_result_callback(sync_settings, |response| async move {
3069 /// let channel = sync_channel;
3070 /// let sync_response = response?;
3071 /// for (room_id, room) in sync_response.rooms.joined {
3072 /// for event in room.timeline.events {
3073 /// channel.send(event).await.unwrap();
3074 /// }
3075 /// }
3076 ///
3077 /// Ok(LoopCtrl::Continue)
3078 /// })
3079 /// .await;
3080 /// };
3081 /// ```
3082 #[instrument(skip(self, callback))]
3083 pub async fn sync_with_result_callback<C>(
3084 &self,
3085 sync_settings: crate::config::SyncSettings,
3086 callback: impl Fn(Result<SyncResponse, Error>) -> C,
3087 ) -> Result<(), Error>
3088 where
3089 C: Future<Output = Result<LoopCtrl, Error>>,
3090 {
3091 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3092
3093 while let Some(result) = sync_stream.next().await {
3094 trace!("Running callback");
3095 if callback(result).await? == LoopCtrl::Break {
3096 trace!("Callback told us to stop");
3097 break;
3098 }
3099 trace!("Done running callback");
3100 }
3101
3102 Ok(())
3103 }
3104
3105 //// Repeatedly synchronize the client state with the server.
3106 ///
3107 /// This method will internally call [`Client::sync_once`] in a loop and is
3108 /// equivalent to the [`Client::sync`] method but the responses are provided
3109 /// as an async stream.
3110 ///
3111 /// # Arguments
3112 ///
3113 /// * `sync_settings` - Settings for the sync call. *Note* that those
3114 /// settings will be only used for the first sync call. See the argument
3115 /// docs for [`Client::sync_once`] for more info.
3116 ///
3117 /// # Examples
3118 ///
3119 /// ```no_run
3120 /// # use url::Url;
3121 /// # async {
3122 /// # let homeserver = Url::parse("http://localhost:8080")?;
3123 /// # let username = "";
3124 /// # let password = "";
3125 /// use futures_util::StreamExt;
3126 /// use matrix_sdk::{Client, config::SyncSettings};
3127 ///
3128 /// let client = Client::new(homeserver).await?;
3129 /// client.matrix_auth().login_username(&username, &password).send().await?;
3130 ///
3131 /// let mut sync_stream =
3132 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3133 ///
3134 /// while let Some(Ok(response)) = sync_stream.next().await {
3135 /// for room in response.rooms.joined.values() {
3136 /// for e in &room.timeline.events {
3137 /// if let Ok(event) = e.raw().deserialize() {
3138 /// println!("Received event {:?}", event);
3139 /// }
3140 /// }
3141 /// }
3142 /// }
3143 ///
3144 /// # anyhow::Ok(()) };
3145 /// ```
3146 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3147 #[instrument(skip(self))]
3148 pub async fn sync_stream(
3149 &self,
3150 mut sync_settings: crate::config::SyncSettings,
3151 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3152 let mut is_first_sync = true;
3153 let mut timeout = None;
3154 let mut last_sync_time: Option<Instant> = None;
3155
3156 let parent_span = Span::current();
3157
3158 async_stream::stream!({
3159 loop {
3160 trace!("Syncing");
3161
3162 if sync_settings.ignore_timeout_on_first_sync {
3163 if is_first_sync {
3164 timeout = sync_settings.timeout.take();
3165 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3166 sync_settings.timeout = timeout.take();
3167 }
3168
3169 is_first_sync = false;
3170 }
3171
3172 yield self
3173 .sync_loop_helper(&mut sync_settings)
3174 .instrument(parent_span.clone())
3175 .await;
3176
3177 Client::delay_sync(&mut last_sync_time).await
3178 }
3179 })
3180 }
3181
3182 /// Get the current, if any, sync token of the client.
3183 /// This will be None if the client didn't sync at least once.
3184 pub(crate) async fn sync_token(&self) -> Option<String> {
3185 self.inner.base_client.sync_token().await
3186 }
3187
3188 /// Gets information about the owner of a given access token.
3189 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3190 let request = whoami::v3::Request::new();
3191 self.send(request).await
3192 }
3193
3194 /// Subscribes a new receiver to client SessionChange broadcasts.
3195 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3196 let broadcast = &self.auth_ctx().session_change_sender;
3197 broadcast.subscribe()
3198 }
3199
3200 /// Sets the save/restore session callbacks.
3201 ///
3202 /// This is another mechanism to get synchronous updates to session tokens,
3203 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3204 pub fn set_session_callbacks(
3205 &self,
3206 reload_session_callback: Box<ReloadSessionCallback>,
3207 save_session_callback: Box<SaveSessionCallback>,
3208 ) -> Result<()> {
3209 self.inner
3210 .auth_ctx
3211 .reload_session_callback
3212 .set(reload_session_callback)
3213 .map_err(|_| Error::MultipleSessionCallbacks)?;
3214
3215 self.inner
3216 .auth_ctx
3217 .save_session_callback
3218 .set(save_session_callback)
3219 .map_err(|_| Error::MultipleSessionCallbacks)?;
3220
3221 Ok(())
3222 }
3223
3224 /// Get the notification settings of the current owner of the client.
3225 pub async fn notification_settings(&self) -> NotificationSettings {
3226 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3227 NotificationSettings::new(self.clone(), ruleset)
3228 }
3229
3230 /// Create a new specialized `Client` that can process notifications.
3231 ///
3232 /// See [`CrossProcessLock::new`] to learn more about
3233 /// `cross_process_lock_config`.
3234 ///
3235 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3236 pub async fn notification_client(
3237 &self,
3238 cross_process_lock_config: CrossProcessLockConfig,
3239 ) -> Result<Client> {
3240 let client = Client {
3241 inner: ClientInner::new(
3242 self.inner.auth_ctx.clone(),
3243 self.server().cloned(),
3244 self.homeserver(),
3245 self.sliding_sync_version(),
3246 self.inner.http_client.clone(),
3247 self.inner
3248 .base_client
3249 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3250 .await?,
3251 self.inner.caches.supported_versions.value(),
3252 self.inner.caches.well_known.value(),
3253 self.inner.respect_login_well_known,
3254 self.inner.event_cache.clone(),
3255 self.inner.send_queue_data.clone(),
3256 self.inner.latest_events.clone(),
3257 #[cfg(feature = "e2e-encryption")]
3258 self.inner.e2ee.encryption_settings,
3259 #[cfg(feature = "e2e-encryption")]
3260 self.inner.enable_share_history_on_invite,
3261 cross_process_lock_config,
3262 #[cfg(feature = "experimental-search")]
3263 self.inner.search_index.clone(),
3264 self.inner.thread_subscription_catchup.clone(),
3265 )
3266 .await,
3267 };
3268
3269 Ok(client)
3270 }
3271
3272 /// The [`EventCache`] instance for this [`Client`].
3273 pub fn event_cache(&self) -> &EventCache {
3274 // SAFETY: always initialized in the `Client` ctor.
3275 self.inner.event_cache.get().unwrap()
3276 }
3277
3278 /// The [`LatestEvents`] instance for this [`Client`].
3279 pub async fn latest_events(&self) -> &LatestEvents {
3280 self.inner
3281 .latest_events
3282 .get_or_init(|| async {
3283 LatestEvents::new(
3284 WeakClient::from_client(self),
3285 self.event_cache().clone(),
3286 SendQueue::new(self.clone()),
3287 self.room_info_notable_update_receiver(),
3288 )
3289 })
3290 .await
3291 }
3292
3293 /// Waits until an at least partially synced room is received, and returns
3294 /// it.
3295 ///
3296 /// **Note: this function will loop endlessly until either it finds the room
3297 /// or an externally set timeout happens.**
3298 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3299 loop {
3300 if let Some(room) = self.get_room(room_id) {
3301 if room.is_state_partially_or_fully_synced() {
3302 debug!("Found just created room!");
3303 return room;
3304 }
3305 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3306 } else {
3307 debug!("Room wasn't found, waiting for sync beat to try again");
3308 }
3309 self.inner.sync_beat.listen().await;
3310 }
3311 }
3312
3313 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3314 /// join it.
3315 pub async fn knock(
3316 &self,
3317 room_id_or_alias: OwnedRoomOrAliasId,
3318 reason: Option<String>,
3319 server_names: Vec<OwnedServerName>,
3320 ) -> Result<Room> {
3321 let request =
3322 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3323 let response = self.send(request).await?;
3324 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3325 Ok(Room::new(self.clone(), base_room))
3326 }
3327
3328 /// Checks whether the provided `user_id` belongs to an ignored user.
3329 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3330 self.base_client().is_user_ignored(user_id).await
3331 }
3332
3333 /// Gets the `max_upload_size` value from the homeserver, getting either a
3334 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3335 /// missing.
3336 ///
3337 /// Check the spec for more info:
3338 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3339 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3340 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3341 if let Some(data) = max_upload_size_lock.get() {
3342 return Ok(data.to_owned());
3343 }
3344
3345 // Use the authenticated endpoint when the server supports it.
3346 let supported_versions = self.supported_versions().await?;
3347 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3348 .is_supported(&supported_versions);
3349
3350 let upload_size = if use_auth {
3351 self.send(authenticated_media::get_media_config::v1::Request::default())
3352 .await?
3353 .upload_size
3354 } else {
3355 #[allow(deprecated)]
3356 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3357 };
3358
3359 match max_upload_size_lock.set(upload_size) {
3360 Ok(_) => Ok(upload_size),
3361 Err(error) => {
3362 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3363 }
3364 }
3365 }
3366
3367 /// The settings to use for decrypting events.
3368 #[cfg(feature = "e2e-encryption")]
3369 pub fn decryption_settings(&self) -> &DecryptionSettings {
3370 &self.base_client().decryption_settings
3371 }
3372
3373 /// Returns the [`SearchIndex`] for this [`Client`].
3374 #[cfg(feature = "experimental-search")]
3375 pub fn search_index(&self) -> &SearchIndex {
3376 &self.inner.search_index
3377 }
3378
3379 /// Whether the client is configured to take thread subscriptions (MSC4306
3380 /// and MSC4308) into account, and the server enabled the experimental
3381 /// feature flag for it.
3382 ///
3383 /// This may cause filtering out of thread subscriptions, and loading the
3384 /// thread subscriptions via the sliding sync extension, when the room
3385 /// list service is being used.
3386 ///
3387 /// This is async and fallible as it may use the network to retrieve the
3388 /// server supported features, if they aren't cached already.
3389 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3390 // Check if the client is configured to support thread subscriptions first.
3391 match self.base_client().threading_support {
3392 ThreadingSupport::Enabled { with_subscriptions: false }
3393 | ThreadingSupport::Disabled => return Ok(false),
3394 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3395 }
3396
3397 // Now, let's check that the server supports it.
3398 let server_enabled = self
3399 .supported_versions()
3400 .await?
3401 .features
3402 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3403
3404 Ok(server_enabled)
3405 }
3406
3407 /// Fetch thread subscriptions changes between `from` and up to `to`.
3408 ///
3409 /// The `limit` optional parameter can be used to limit the number of
3410 /// entries in a response. It can also be overridden by the server, if
3411 /// it's deemed too large.
3412 pub async fn fetch_thread_subscriptions(
3413 &self,
3414 from: Option<String>,
3415 to: Option<String>,
3416 limit: Option<UInt>,
3417 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3418 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3419 from,
3420 to,
3421 limit,
3422 });
3423 Ok(self.send(request).await?)
3424 }
3425
3426 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3427 self.inner.thread_subscription_catchup.get().unwrap()
3428 }
3429
3430 /// Pause the client for background suspension.
3431 ///
3432 /// This method:
3433 /// 1. Disables all send queues (prevents new message sends).
3434 /// 2. Pauses all database stores, waiting for in-flight operations and
3435 /// releasing all connections and file locks.
3436 ///
3437 /// Call [`Client::resume()`] when the app returns to the foreground.
3438 ///
3439 /// # iOS
3440 ///
3441 /// Call this before the app is suspended to avoid `0xdead10cc` kills.
3442 /// Typically called from
3443 /// [`applicationDidEnterBackground`](https://developer.apple.com/documentation/uikit/uiapplicationdelegate/applicationdidenterbackground(_:))
3444 /// or an equivalent SwiftUI lifecycle event, *after* stopping the
3445 /// `matrix_sdk_ui::sync_service::SyncService`.
3446 pub async fn pause(&self) -> Result<()> {
3447 info!("Client::pause — releasing database resources");
3448
3449 // Disable send queues so no new sends hit the stores.
3450 self.send_queue().set_enabled(false).await;
3451
3452 // Close all stores (waits for in-flight ops, closes connections).
3453 self.base_client().close_stores().await?;
3454
3455 info!("Client::pause — complete, all database connections released");
3456 Ok(())
3457 }
3458
3459 /// Resume the client after a [`Client::pause()`].
3460 ///
3461 /// Re-acquires store resources and re-enables send queues.
3462 ///
3463 /// If your app stopped the `matrix_sdk_ui::sync_service::SyncService`
3464 /// before pausing, restart it separately as appropriate for your app
3465 /// lifecycle.
3466 pub async fn resume(&self) -> Result<()> {
3467 info!("Client::resume — re-acquiring database resources");
3468
3469 // Reopen stores (creates new connection pools).
3470 self.base_client().reopen_stores().await?;
3471
3472 // Re-enable send queues.
3473 self.send_queue().set_enabled(true).await;
3474
3475 info!("Client::resume — complete");
3476 Ok(())
3477 }
3478
3479 /// Perform database optimizations if any are available, i.e. vacuuming in
3480 /// SQLite.
3481 ///
3482 /// **Warning:** this was added to check if SQLite fragmentation was the
3483 /// source of performance issues, **DO NOT use in production**.
3484 #[doc(hidden)]
3485 pub async fn optimize_stores(&self) -> Result<()> {
3486 trace!("Optimizing state store...");
3487 self.state_store().optimize().await?;
3488
3489 trace!("Optimizing event cache store...");
3490 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3491 clean_lock.optimize().await?;
3492 }
3493
3494 trace!("Optimizing media store...");
3495 self.media_store().lock().await?.optimize().await?;
3496
3497 Ok(())
3498 }
3499
3500 /// Returns the sizes of the existing stores, if known.
3501 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3502 #[cfg(feature = "e2e-encryption")]
3503 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3504 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3505 {
3506 Some(store_size)
3507 } else {
3508 None
3509 };
3510 #[cfg(not(feature = "e2e-encryption"))]
3511 let crypto_store_size = None;
3512
3513 let state_store_size = self.state_store().get_size().await.ok().flatten();
3514
3515 let event_cache_store_size = if let Some(clean_lock) =
3516 self.event_cache_store().lock().await?.as_clean()
3517 && let Ok(Some(store_size)) = clean_lock.get_size().await
3518 {
3519 Some(store_size)
3520 } else {
3521 None
3522 };
3523
3524 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3525
3526 Ok(StoreSizes {
3527 crypto_store: crypto_store_size,
3528 state_store: state_store_size,
3529 event_cache_store: event_cache_store_size,
3530 media_store: media_store_size,
3531 })
3532 }
3533
3534 /// Get a reference to the client's task monitor, for spawning background
3535 /// tasks.
3536 pub fn task_monitor(&self) -> &TaskMonitor {
3537 &self.inner.task_monitor
3538 }
3539
3540 /// Add a subscriber for duplicate key upload error notifications triggered
3541 /// by requests to /keys/upload.
3542 #[cfg(feature = "e2e-encryption")]
3543 pub fn subscribe_to_duplicate_key_upload_errors(
3544 &self,
3545 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3546 self.inner.duplicate_key_upload_error_sender.subscribe()
3547 }
3548
3549 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3550 /// for the given room.
3551 ///
3552 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3553 #[cfg(feature = "e2e-encryption")]
3554 pub async fn get_pending_key_bundle_details_for_room(
3555 &self,
3556 room_id: &RoomId,
3557 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3558 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3559 }
3560
3561 /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3562 /// a DM.
3563 pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3564 &self.inner.base_client.dm_room_definition
3565 }
3566}
3567
3568/// Contains the disk size of the different stores, if known. It won't be
3569/// available for in-memory stores.
3570#[derive(Debug, Clone)]
3571pub struct StoreSizes {
3572 /// The size of the CryptoStore.
3573 pub crypto_store: Option<usize>,
3574 /// The size of the StateStore.
3575 pub state_store: Option<usize>,
3576 /// The size of the EventCacheStore.
3577 pub event_cache_store: Option<usize>,
3578 /// The size of the MediaStore.
3579 pub media_store: Option<usize>,
3580}
3581
3582#[cfg(any(feature = "testing", test))]
3583impl Client {
3584 /// Test helper to mark users as tracked by the crypto layer.
3585 #[cfg(feature = "e2e-encryption")]
3586 pub async fn update_tracked_users_for_testing(
3587 &self,
3588 user_ids: impl IntoIterator<Item = &UserId>,
3589 ) {
3590 let olm = self.olm_machine().await;
3591 let olm = olm.as_ref().unwrap();
3592 olm.update_tracked_users(user_ids).await.unwrap();
3593 }
3594}
3595
3596/// A weak reference to the inner client, useful when trying to get a handle
3597/// on the owning client.
3598#[derive(Clone, Debug)]
3599pub(crate) struct WeakClient {
3600 client: Weak<ClientInner>,
3601}
3602
3603impl WeakClient {
3604 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3605 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3606 Self { client: Arc::downgrade(client) }
3607 }
3608
3609 /// Construct a [`WeakClient`] from a [`Client`].
3610 pub fn from_client(client: &Client) -> Self {
3611 Self::from_inner(&client.inner)
3612 }
3613
3614 /// Attempts to get a [`Client`] from this [`WeakClient`].
3615 pub fn get(&self) -> Option<Client> {
3616 self.client.upgrade().map(|inner| Client { inner })
3617 }
3618
3619 /// Gets the number of strong (`Arc`) pointers still pointing to this
3620 /// client.
3621 #[allow(dead_code)]
3622 pub fn strong_count(&self) -> usize {
3623 self.client.strong_count()
3624 }
3625}
3626
3627/// Information about the state of a room before we joined it.
3628#[derive(Debug, Clone, Default)]
3629struct PreJoinRoomInfo {
3630 /// The user who invited us to the room, if any.
3631 pub inviter: Option<RoomMember>,
3632}
3633
3634// The http mocking library is not supported for wasm32
3635#[cfg(all(test, not(target_family = "wasm")))]
3636pub(crate) mod tests {
3637 use std::{sync::Arc, time::Duration};
3638
3639 use assert_matches::assert_matches;
3640 use assert_matches2::assert_let;
3641 use eyeball::SharedObservable;
3642 use futures_util::{FutureExt, StreamExt, pin_mut};
3643 use js_int::{UInt, uint};
3644 use matrix_sdk_base::{
3645 RoomState,
3646 store::{MemoryStore, StoreConfig},
3647 ttl::TtlValue,
3648 };
3649 use matrix_sdk_test::{
3650 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3651 event_factory::EventFactory,
3652 };
3653 #[cfg(target_family = "wasm")]
3654 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3655
3656 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3657 use ruma::{
3658 RoomId, ServerName, UserId,
3659 api::{
3660 FeatureFlag, MatrixVersion,
3661 client::{room::create_room::v3::Request as CreateRoomRequest, rtc::RtcTransport},
3662 },
3663 assign,
3664 events::{
3665 ignored_user_list::IgnoredUserListEventContent,
3666 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3667 },
3668 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3669 };
3670 use serde_json::json;
3671 use stream_assert::{assert_next_matches, assert_pending};
3672 use tokio::{
3673 spawn,
3674 time::{sleep, timeout},
3675 };
3676 use url::Url;
3677
3678 use super::Client;
3679 use crate::{
3680 Error, TransmissionProgress,
3681 client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3682 config::{RequestConfig, SyncSettings},
3683 futures::SendRequest,
3684 media::MediaError,
3685 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3686 };
3687
3688 #[async_test]
3689 async fn test_account_data() {
3690 let server = MatrixMockServer::new().await;
3691 let client = server.client_builder().build().await;
3692
3693 let f = EventFactory::new();
3694 server
3695 .mock_sync()
3696 .ok_and_run(&client, |builder| {
3697 builder.add_global_account_data(
3698 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3699 );
3700 })
3701 .await;
3702
3703 let content = client
3704 .account()
3705 .account_data::<IgnoredUserListEventContent>()
3706 .await
3707 .unwrap()
3708 .unwrap()
3709 .deserialize()
3710 .unwrap();
3711
3712 assert_eq!(content.ignored_users.len(), 1);
3713 }
3714
3715 #[async_test]
3716 async fn test_successful_discovery() {
3717 // Imagine this is `matrix.org`.
3718 let server = MatrixMockServer::new().await;
3719 let server_url = server.uri();
3720
3721 // Imagine this is `matrix-client.matrix.org`.
3722 let homeserver = MatrixMockServer::new().await;
3723 let homeserver_url = homeserver.uri();
3724
3725 // Imagine Alice has the user ID `@alice:matrix.org`.
3726 let domain = server_url.strip_prefix("http://").unwrap();
3727 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3728
3729 // The `.well-known` is on the server (e.g. `matrix.org`).
3730 server
3731 .mock_well_known()
3732 .ok_with_homeserver_url(&homeserver_url)
3733 .mock_once()
3734 .named("well-known")
3735 .mount()
3736 .await;
3737
3738 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3739 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3740
3741 let client = Client::builder()
3742 .insecure_server_name_no_tls(alice.server_name())
3743 .build()
3744 .await
3745 .unwrap();
3746
3747 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3748 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3749 client.server_versions().await.unwrap();
3750 }
3751
3752 #[async_test]
3753 async fn test_discovery_broken_server() {
3754 let server = MatrixMockServer::new().await;
3755 let server_url = server.uri();
3756 let domain = server_url.strip_prefix("http://").unwrap();
3757 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3758
3759 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3760
3761 assert!(
3762 Client::builder()
3763 .insecure_server_name_no_tls(alice.server_name())
3764 .build()
3765 .await
3766 .is_err(),
3767 "Creating a client from a user ID should fail when the .well-known request fails."
3768 );
3769 }
3770
3771 #[async_test]
3772 async fn test_room_creation() {
3773 let server = MatrixMockServer::new().await;
3774 let client = server.client_builder().build().await;
3775
3776 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3777 server
3778 .mock_sync()
3779 .ok_and_run(&client, |builder| {
3780 builder.add_joined_room(
3781 JoinedRoomBuilder::default()
3782 .add_state_event(
3783 f.member(user_id!("@example:localhost")).display_name("example"),
3784 )
3785 .add_state_event(f.default_power_levels()),
3786 );
3787 })
3788 .await;
3789
3790 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3791 assert_eq!(room.state(), RoomState::Joined);
3792 }
3793
3794 #[async_test]
3795 async fn test_retry_limit_http_requests() {
3796 let server = MatrixMockServer::new().await;
3797 let client = server
3798 .client_builder()
3799 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3800 .build()
3801 .await;
3802
3803 assert!(client.request_config().retry_limit.unwrap() == 4);
3804
3805 server.mock_who_am_i().error500().expect(4).mount().await;
3806
3807 client.whoami().await.unwrap_err();
3808 }
3809
3810 #[async_test]
3811 async fn test_retry_timeout_http_requests() {
3812 // Keep this timeout small so that the test doesn't take long
3813 let retry_timeout = Duration::from_secs(5);
3814 let server = MatrixMockServer::new().await;
3815 let client = server
3816 .client_builder()
3817 .on_builder(|builder| {
3818 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3819 })
3820 .build()
3821 .await;
3822
3823 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3824
3825 server.mock_login().error500().expect(2..).mount().await;
3826
3827 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3828 }
3829
3830 #[async_test]
3831 async fn test_short_retry_initial_http_requests() {
3832 let server = MatrixMockServer::new().await;
3833 let client = server
3834 .client_builder()
3835 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3836 .build()
3837 .await;
3838
3839 server.mock_login().error500().expect(3..).mount().await;
3840
3841 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3842 }
3843
3844 #[async_test]
3845 async fn test_no_retry_http_requests() {
3846 let server = MatrixMockServer::new().await;
3847 let client = server.client_builder().build().await;
3848
3849 server.mock_devices().error500().mock_once().mount().await;
3850
3851 client.devices().await.unwrap_err();
3852 }
3853
3854 #[async_test]
3855 async fn test_set_homeserver() {
3856 let client = MockClientBuilder::new(None).build().await;
3857 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3858
3859 let homeserver = Url::parse("http://example.com/").unwrap();
3860 client.set_homeserver(homeserver.clone());
3861 assert_eq!(client.homeserver(), homeserver);
3862 }
3863
3864 #[async_test]
3865 async fn test_search_user_request() {
3866 let server = MatrixMockServer::new().await;
3867 let client = server.client_builder().build().await;
3868
3869 server.mock_user_directory().ok().mock_once().mount().await;
3870
3871 let response = client.search_users("test", 50).await.unwrap();
3872 assert_eq!(response.results.len(), 1);
3873 let result = &response.results[0];
3874 assert_eq!(result.user_id.to_string(), "@test:example.me");
3875 assert_eq!(result.display_name.clone().unwrap(), "Test");
3876 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3877 assert!(!response.limited);
3878 }
3879
3880 #[async_test]
3881 async fn test_request_unstable_features() {
3882 let server = MatrixMockServer::new().await;
3883 let client = server.client_builder().no_server_versions().build().await;
3884
3885 server
3886 .mock_versions()
3887 .with_feature("org.matrix.e2e_cross_signing", true)
3888 .ok()
3889 .mock_once()
3890 .mount()
3891 .await;
3892
3893 let unstable_features = client.unstable_features().await.unwrap();
3894 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3895 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3896 }
3897
3898 #[async_test]
3899 async fn test_can_homeserver_push_encrypted_event_to_device() {
3900 let server = MatrixMockServer::new().await;
3901 let client = server.client_builder().no_server_versions().build().await;
3902
3903 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3904
3905 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3906 assert!(msc4028_enabled);
3907 }
3908
3909 #[async_test]
3910 async fn test_recently_visited_rooms() {
3911 // Tracking recently visited rooms requires authentication
3912 let client = MockClientBuilder::new(None).unlogged().build().await;
3913 assert_matches!(
3914 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3915 Err(Error::AuthenticationRequired)
3916 );
3917
3918 let client = MockClientBuilder::new(None).build().await;
3919 let account = client.account();
3920
3921 // We should start off with an empty list
3922 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3923
3924 // Tracking a valid room id should add it to the list
3925 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3926 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3927 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3928
3929 // And the existing list shouldn't be changed
3930 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3931 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3932
3933 // Tracking the same room again shouldn't change the list
3934 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3935 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3936 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3937
3938 // Tracking a second room should add it to the front of the list
3939 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3940 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3941 assert_eq!(
3942 account.get_recently_visited_rooms().await.unwrap(),
3943 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3944 );
3945
3946 // Tracking the first room yet again should move it to the front of the list
3947 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3948 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3949 assert_eq!(
3950 account.get_recently_visited_rooms().await.unwrap(),
3951 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3952 );
3953
3954 // Tracking should be capped at 20
3955 for n in 0..20 {
3956 account
3957 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3958 .await
3959 .unwrap();
3960 }
3961
3962 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3963
3964 // And the initial rooms should've been pushed out
3965 let rooms = account.get_recently_visited_rooms().await.unwrap();
3966 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3967 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3968
3969 // And the last tracked room should be the first
3970 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3971 }
3972
3973 #[async_test]
3974 async fn test_client_no_cycle_with_event_cache() {
3975 let client = MockClientBuilder::new(None).build().await;
3976
3977 // Wait for the init tasks to die.
3978 sleep(Duration::from_secs(1)).await;
3979
3980 let weak_client = WeakClient::from_client(&client);
3981 assert_eq!(weak_client.strong_count(), 1);
3982
3983 {
3984 let room_id = room_id!("!room:example.org");
3985
3986 // Have the client know the room.
3987 let response = SyncResponseBuilder::default()
3988 .add_joined_room(JoinedRoomBuilder::new(room_id))
3989 .build_sync_response();
3990 client.inner.base_client.receive_sync_response(response).await.unwrap();
3991
3992 client.event_cache().subscribe().unwrap();
3993
3994 let (_room_event_cache, _drop_handles) =
3995 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3996 }
3997
3998 drop(client);
3999
4000 // Give a bit of time for background tasks to die.
4001 sleep(Duration::from_secs(1)).await;
4002
4003 // The weak client must be the last reference to the client now.
4004 assert_eq!(weak_client.strong_count(), 0);
4005 let client = weak_client.get();
4006 assert!(
4007 client.is_none(),
4008 "too many strong references to the client: {}",
4009 Arc::strong_count(&client.unwrap().inner)
4010 );
4011 }
4012
4013 #[async_test]
4014 async fn test_supported_versions_caching() {
4015 let server = MatrixMockServer::new().await;
4016
4017 let versions_mock = server
4018 .mock_versions()
4019 .expect_default_access_token()
4020 .with_feature("org.matrix.e2e_cross_signing", true)
4021 .ok()
4022 .named("first versions mock")
4023 .expect(1)
4024 .mount_as_scoped()
4025 .await;
4026
4027 let memory_store = Arc::new(MemoryStore::new());
4028 let client = server
4029 .client_builder()
4030 .no_server_versions()
4031 .on_builder(|builder| {
4032 builder.store_config(
4033 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4034 .state_store(memory_store.clone()),
4035 )
4036 })
4037 .build()
4038 .await;
4039
4040 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4041
4042 // The result was cached.
4043 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
4044 // This subsequent call hits the in-memory cache.
4045 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4046
4047 drop(client);
4048
4049 let client = server
4050 .client_builder()
4051 .no_server_versions()
4052 .on_builder(|builder| {
4053 builder.store_config(
4054 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4055 .state_store(memory_store.clone()),
4056 )
4057 })
4058 .build()
4059 .await;
4060
4061 // These calls to the new client hit the on-disk cache.
4062 assert!(
4063 client
4064 .unstable_features()
4065 .await
4066 .unwrap()
4067 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
4068 );
4069
4070 let supported = client.supported_versions().await.unwrap();
4071 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4072 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4073
4074 // Then this call hits the in-memory cache.
4075 let supported = client.supported_versions().await.unwrap();
4076 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4077 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4078
4079 drop(versions_mock);
4080
4081 // Now, reset the cache, and observe the endpoint being called again once.
4082 client.reset_supported_versions().await.unwrap();
4083
4084 server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4085
4086 // Hits network again.
4087 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4088 // Hits in-memory cache again.
4089 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4090 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4091
4092 // Force an expiry of the data.
4093 let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4094 let mut ttl_value = TtlValue::new(supported_versions);
4095 ttl_value.expire();
4096 client.inner.caches.supported_versions.set_value(ttl_value);
4097
4098 // Call the method to trigger a cache refresh background task.
4099 client.supported_versions_cached().await.unwrap().unwrap();
4100
4101 // We wait for the task to finish, the endpoint should have been called again.
4102 sleep(Duration::from_secs(1)).await;
4103 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4104 }
4105
4106 #[async_test]
4107 async fn test_well_known_caching() {
4108 let server = MatrixMockServer::new().await;
4109 let server_url = server.uri();
4110 let domain = server_url.strip_prefix("http://").unwrap();
4111 let server_name = <&ServerName>::try_from(domain).unwrap();
4112 let rtc_foci = vec![RtcTransport::livekit("https://livekit.example.com".to_owned())];
4113
4114 let well_known_mock = server
4115 .mock_well_known()
4116 .ok()
4117 .named("well known mock")
4118 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4119 .mount_as_scoped()
4120 .await;
4121
4122 let memory_store = Arc::new(MemoryStore::new());
4123 let client = Client::builder()
4124 .insecure_server_name_no_tls(server_name)
4125 .store_config(
4126 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4127 .state_store(memory_store.clone()),
4128 )
4129 .build()
4130 .await
4131 .unwrap();
4132
4133 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4134
4135 // This subsequent call hits the in-memory cache.
4136 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4137
4138 drop(client);
4139
4140 let client = server
4141 .client_builder()
4142 .no_server_versions()
4143 .on_builder(|builder| {
4144 builder.store_config(
4145 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4146 .state_store(memory_store.clone()),
4147 )
4148 })
4149 .build()
4150 .await;
4151
4152 // This call to the new client hits the on-disk cache.
4153 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4154
4155 // Then this call hits the in-memory cache.
4156 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4157
4158 drop(well_known_mock);
4159
4160 // Now, reset the cache, and observe the endpoints being called again once.
4161 client.reset_well_known().await.unwrap();
4162
4163 server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4164
4165 // Hits network again.
4166 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4167 // Hits in-memory cache again.
4168 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4169
4170 // Force an expiry of the data.
4171 let well_known = client.well_known().await;
4172 let mut ttl_value = TtlValue::new(well_known);
4173 ttl_value.expire();
4174 client.inner.caches.well_known.set_value(ttl_value);
4175
4176 // Call the method again to trigger a cache refresh background task.
4177 client.well_known().await;
4178
4179 // We wait for the task to finish, the endpoint should have been called again.
4180 // We need to wait a bit because the first requests using the server name of the
4181 // user will fail, only the requests using the homeserver URL will succeed.
4182 sleep(Duration::from_secs(5)).await;
4183 assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4184 }
4185
4186 #[async_test]
4187 async fn test_missing_well_known_caching() {
4188 let server = MatrixMockServer::new().await;
4189 let rtc_foci: Vec<RtcTransport> = vec![];
4190
4191 let well_known_mock = server
4192 .mock_well_known()
4193 .error_unrecognized()
4194 .named("first well-known mock")
4195 .expect(1)
4196 .mount_as_scoped()
4197 .await;
4198
4199 let memory_store = Arc::new(MemoryStore::new());
4200 let client = server
4201 .client_builder()
4202 .on_builder(|builder| {
4203 builder.store_config(
4204 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4205 .state_store(memory_store.clone()),
4206 )
4207 })
4208 .build()
4209 .await;
4210
4211 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4212
4213 // This subsequent call hits the in-memory cache.
4214 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4215
4216 drop(client);
4217
4218 let client = server
4219 .client_builder()
4220 .on_builder(|builder| {
4221 builder.store_config(
4222 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4223 .state_store(memory_store.clone()),
4224 )
4225 })
4226 .build()
4227 .await;
4228
4229 // This call to the new client hits the on-disk cache.
4230 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4231
4232 // Then this call hits the in-memory cache.
4233 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4234
4235 drop(well_known_mock);
4236
4237 // Now, reset the cache, and observe the endpoints being called again once.
4238 client.reset_well_known().await.unwrap();
4239
4240 server
4241 .mock_well_known()
4242 .error_unrecognized()
4243 .expect(1)
4244 .named("second well-known mock")
4245 .mount()
4246 .await;
4247
4248 // Hits network again.
4249 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4250 // Hits in-memory cache again.
4251 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4252 }
4253
4254 #[async_test]
4255 async fn test_no_network_doesnt_cause_infinite_retries() {
4256 // We want infinite retries for transient errors.
4257 let client = MockClientBuilder::new(None)
4258 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4259 .build()
4260 .await;
4261
4262 // We don't define a mock server on purpose here, so that the error is really a
4263 // network error.
4264 client.whoami().await.unwrap_err();
4265 }
4266
4267 #[async_test]
4268 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4269 let server = MatrixMockServer::new().await;
4270 let client = server.client_builder().build().await;
4271
4272 let room_id = room_id!("!room:example.org");
4273
4274 server
4275 .mock_sync()
4276 .ok_and_run(&client, |builder| {
4277 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4278 })
4279 .await;
4280
4281 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4282 assert_eq!(room.room_id(), room_id);
4283 }
4284
4285 #[async_test]
4286 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4287 let server = MatrixMockServer::new().await;
4288 let client = server.client_builder().build().await;
4289
4290 let room_id = room_id!("!room:example.org");
4291
4292 let client = Arc::new(client);
4293
4294 // Perform the /sync request with a delay so it starts after the
4295 // `await_room_remote_echo` call has happened
4296 spawn({
4297 let client = client.clone();
4298 async move {
4299 sleep(Duration::from_millis(100)).await;
4300
4301 server
4302 .mock_sync()
4303 .ok_and_run(&client, |builder| {
4304 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4305 })
4306 .await;
4307 }
4308 });
4309
4310 let room =
4311 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4312 assert_eq!(room.room_id(), room_id);
4313 }
4314
4315 #[async_test]
4316 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4317 let client = MockClientBuilder::new(None).build().await;
4318
4319 let room_id = room_id!("!room:example.org");
4320 // Room is not present so the client won't be able to find it. The call will
4321 // timeout.
4322 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4323 }
4324
4325 #[async_test]
4326 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4327 let server = MatrixMockServer::new().await;
4328 let client = server.client_builder().build().await;
4329
4330 server.mock_create_room().ok().mount().await;
4331
4332 // Create a room in the internal store
4333 let room = client
4334 .create_room(assign!(CreateRoomRequest::new(), {
4335 invite: vec![],
4336 is_direct: false,
4337 }))
4338 .await
4339 .unwrap();
4340
4341 // Room is locally present, but not synced, the call will timeout
4342 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4343 .await
4344 .unwrap_err();
4345 }
4346
4347 #[async_test]
4348 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4349 let server = MatrixMockServer::new().await;
4350 let client = server.client_builder().build().await;
4351
4352 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4353
4354 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4355 assert_matches!(ret, Ok(true));
4356 }
4357
4358 #[async_test]
4359 async fn test_is_room_alias_available_if_alias_is_resolved() {
4360 let server = MatrixMockServer::new().await;
4361 let client = server.client_builder().build().await;
4362
4363 server
4364 .mock_room_directory_resolve_alias()
4365 .ok("!some_room_id:matrix.org", Vec::new())
4366 .expect(1)
4367 .mount()
4368 .await;
4369
4370 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4371 assert_matches!(ret, Ok(false));
4372 }
4373
4374 #[async_test]
4375 async fn test_is_room_alias_available_if_error_found() {
4376 let server = MatrixMockServer::new().await;
4377 let client = server.client_builder().build().await;
4378
4379 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4380
4381 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4382 assert_matches!(ret, Err(_));
4383 }
4384
4385 #[async_test]
4386 async fn test_create_room_alias() {
4387 let server = MatrixMockServer::new().await;
4388 let client = server.client_builder().build().await;
4389
4390 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4391
4392 let ret = client
4393 .create_room_alias(
4394 room_alias_id!("#some_alias:matrix.org"),
4395 room_id!("!some_room:matrix.org"),
4396 )
4397 .await;
4398 assert_matches!(ret, Ok(()));
4399 }
4400
4401 #[async_test]
4402 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4403 let server = MatrixMockServer::new().await;
4404 let client = server.client_builder().build().await;
4405
4406 let room_id = room_id!("!a-room:matrix.org");
4407
4408 // Make sure the summary endpoint is called once
4409 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4410
4411 // We create a locally cached invited room
4412 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4413
4414 // And we get a preview, the server endpoint was reached
4415 let preview = client
4416 .get_room_preview(room_id.into(), Vec::new())
4417 .await
4418 .expect("Room preview should be retrieved");
4419
4420 assert_eq!(invited_room.room_id(), preview.room_id);
4421 }
4422
4423 #[async_test]
4424 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4425 let server = MatrixMockServer::new().await;
4426 let client = server.client_builder().build().await;
4427
4428 let room_id = room_id!("!a-room:matrix.org");
4429
4430 // Make sure the summary endpoint is called once
4431 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4432
4433 // We create a locally cached left room
4434 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4435
4436 // And we get a preview, the server endpoint was reached
4437 let preview = client
4438 .get_room_preview(room_id.into(), Vec::new())
4439 .await
4440 .expect("Room preview should be retrieved");
4441
4442 assert_eq!(left_room.room_id(), preview.room_id);
4443 }
4444
4445 #[async_test]
4446 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4447 let server = MatrixMockServer::new().await;
4448 let client = server.client_builder().build().await;
4449
4450 let room_id = room_id!("!a-room:matrix.org");
4451
4452 // Make sure the summary endpoint is called once
4453 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4454
4455 // We create a locally cached knocked room
4456 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4457
4458 // And we get a preview, the server endpoint was reached
4459 let preview = client
4460 .get_room_preview(room_id.into(), Vec::new())
4461 .await
4462 .expect("Room preview should be retrieved");
4463
4464 assert_eq!(knocked_room.room_id(), preview.room_id);
4465 }
4466
4467 #[async_test]
4468 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4469 let server = MatrixMockServer::new().await;
4470 let client = server.client_builder().build().await;
4471
4472 let room_id = room_id!("!a-room:matrix.org");
4473
4474 // Make sure the summary endpoint is not called
4475 server.mock_room_summary().ok(room_id).never().mount().await;
4476
4477 // We create a locally cached joined room
4478 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4479
4480 // And we get a preview, no server endpoint was reached
4481 let preview = client
4482 .get_room_preview(room_id.into(), Vec::new())
4483 .await
4484 .expect("Room preview should be retrieved");
4485
4486 assert_eq!(joined_room.room_id(), preview.room_id);
4487 }
4488
4489 #[async_test]
4490 async fn test_media_preview_config() {
4491 let server = MatrixMockServer::new().await;
4492 let client = server.client_builder().build().await;
4493
4494 server
4495 .mock_sync()
4496 .ok_and_run(&client, |builder| {
4497 builder.add_custom_global_account_data(json!({
4498 "content": {
4499 "media_previews": "private",
4500 "invite_avatars": "off"
4501 },
4502 "type": "m.media_preview_config"
4503 }));
4504 })
4505 .await;
4506
4507 let (initial_value, stream) =
4508 client.account().observe_media_preview_config().await.unwrap();
4509
4510 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4511 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4512 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4513 pin_mut!(stream);
4514 assert_pending!(stream);
4515
4516 server
4517 .mock_sync()
4518 .ok_and_run(&client, |builder| {
4519 builder.add_custom_global_account_data(json!({
4520 "content": {
4521 "media_previews": "off",
4522 "invite_avatars": "on"
4523 },
4524 "type": "m.media_preview_config"
4525 }));
4526 })
4527 .await;
4528
4529 assert_next_matches!(
4530 stream,
4531 MediaPreviewConfigEventContent {
4532 media_previews: Some(MediaPreviews::Off),
4533 invite_avatars: Some(InviteAvatars::On),
4534 ..
4535 }
4536 );
4537 assert_pending!(stream);
4538 }
4539
4540 #[async_test]
4541 async fn test_unstable_media_preview_config() {
4542 let server = MatrixMockServer::new().await;
4543 let client = server.client_builder().build().await;
4544
4545 server
4546 .mock_sync()
4547 .ok_and_run(&client, |builder| {
4548 builder.add_custom_global_account_data(json!({
4549 "content": {
4550 "media_previews": "private",
4551 "invite_avatars": "off"
4552 },
4553 "type": "io.element.msc4278.media_preview_config"
4554 }));
4555 })
4556 .await;
4557
4558 let (initial_value, stream) =
4559 client.account().observe_media_preview_config().await.unwrap();
4560
4561 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4562 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4563 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4564 pin_mut!(stream);
4565 assert_pending!(stream);
4566
4567 server
4568 .mock_sync()
4569 .ok_and_run(&client, |builder| {
4570 builder.add_custom_global_account_data(json!({
4571 "content": {
4572 "media_previews": "off",
4573 "invite_avatars": "on"
4574 },
4575 "type": "io.element.msc4278.media_preview_config"
4576 }));
4577 })
4578 .await;
4579
4580 assert_next_matches!(
4581 stream,
4582 MediaPreviewConfigEventContent {
4583 media_previews: Some(MediaPreviews::Off),
4584 invite_avatars: Some(InviteAvatars::On),
4585 ..
4586 }
4587 );
4588 assert_pending!(stream);
4589 }
4590
4591 #[async_test]
4592 async fn test_media_preview_config_not_found() {
4593 let server = MatrixMockServer::new().await;
4594 let client = server.client_builder().build().await;
4595
4596 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4597
4598 assert!(initial_value.is_none());
4599 }
4600
4601 #[async_test]
4602 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4603 // The default Matrix version we use is 1.11 or higher, so authenticated media
4604 // is supported.
4605 let server = MatrixMockServer::new().await;
4606 let client = server.client_builder().build().await;
4607
4608 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4609
4610 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4611 client.load_or_fetch_max_upload_size().await.unwrap();
4612
4613 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4614 }
4615
4616 #[async_test]
4617 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4618 // The server must advertise support for the stable feature for authenticated
4619 // media support, so we mock the `GET /versions` response.
4620 let server = MatrixMockServer::new().await;
4621 let client = server.client_builder().no_server_versions().build().await;
4622
4623 server
4624 .mock_versions()
4625 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4626 .with_feature("org.matrix.msc3916.stable", true)
4627 .ok()
4628 .named("versions")
4629 .expect(1)
4630 .mount()
4631 .await;
4632
4633 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4634
4635 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4636 client.load_or_fetch_max_upload_size().await.unwrap();
4637
4638 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4639 }
4640
4641 #[async_test]
4642 async fn test_load_or_fetch_max_upload_size_no_auth() {
4643 // The server must not support Matrix 1.11 or higher for unauthenticated
4644 // media requests, so we mock the `GET /versions` response.
4645 let server = MatrixMockServer::new().await;
4646 let client = server.client_builder().no_server_versions().build().await;
4647
4648 server
4649 .mock_versions()
4650 .with_versions(vec!["v1.1"])
4651 .ok()
4652 .named("versions")
4653 .expect(1)
4654 .mount()
4655 .await;
4656
4657 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4658
4659 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4660 client.load_or_fetch_max_upload_size().await.unwrap();
4661
4662 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4663 }
4664
4665 #[async_test]
4666 async fn test_uploading_a_too_large_media_file() {
4667 let server = MatrixMockServer::new().await;
4668 let client = server.client_builder().build().await;
4669
4670 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4671 client.load_or_fetch_max_upload_size().await.unwrap();
4672 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4673
4674 let data = vec![1, 2];
4675 let upload_request =
4676 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4677 let request = SendRequest {
4678 client: client.clone(),
4679 request: upload_request,
4680 config: None,
4681 send_progress: SharedObservable::new(TransmissionProgress::default()),
4682 };
4683 let media_request = SendMediaUploadRequest::new(request);
4684
4685 let error = media_request.await.err();
4686 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4687 assert_eq!(max, uint!(1));
4688 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4689 }
4690
4691 #[async_test]
4692 async fn test_dont_ignore_timeout_on_first_sync() {
4693 let server = MatrixMockServer::new().await;
4694 let client = server.client_builder().build().await;
4695
4696 server
4697 .mock_sync()
4698 .timeout(Some(Duration::from_secs(30)))
4699 .ok(|_| {})
4700 .mock_once()
4701 .named("sync_with_timeout")
4702 .mount()
4703 .await;
4704
4705 // Call the endpoint once to check the timeout.
4706 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4707
4708 timeout(Duration::from_secs(1), async {
4709 stream.next().await.unwrap().unwrap();
4710 })
4711 .await
4712 .unwrap();
4713 }
4714
4715 #[async_test]
4716 async fn test_ignore_timeout_on_first_sync() {
4717 let server = MatrixMockServer::new().await;
4718 let client = server.client_builder().build().await;
4719
4720 server
4721 .mock_sync()
4722 .timeout(None)
4723 .ok(|_| {})
4724 .mock_once()
4725 .named("sync_no_timeout")
4726 .mount()
4727 .await;
4728 server
4729 .mock_sync()
4730 .timeout(Some(Duration::from_secs(30)))
4731 .ok(|_| {})
4732 .mock_once()
4733 .named("sync_with_timeout")
4734 .mount()
4735 .await;
4736
4737 // Call each version of the endpoint once to check the timeouts.
4738 let mut stream = Box::pin(
4739 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4740 );
4741
4742 timeout(Duration::from_secs(1), async {
4743 stream.next().await.unwrap().unwrap();
4744 stream.next().await.unwrap().unwrap();
4745 })
4746 .await
4747 .unwrap();
4748 }
4749
4750 #[async_test]
4751 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4752 let server = MatrixMockServer::new().await;
4753 let client = server.client_builder().build().await;
4754 // This is the user ID that is inside MemberAdditional.
4755 // Note the confusing username, so we can share
4756 // GlobalAccountDataTestEvent::Direct with the invited test.
4757 let user_id = user_id!("@invited:localhost");
4758
4759 // When we receive a sync response saying "invited" is invited to a DM
4760 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4761 let response = SyncResponseBuilder::default()
4762 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4763 .add_global_account_data(
4764 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4765 )
4766 .build_sync_response();
4767 client.base_client().receive_sync_response(response).await.unwrap();
4768
4769 // Then get_dm_room finds this room
4770 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4771 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4772 }
4773
4774 #[async_test]
4775 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4776 let server = MatrixMockServer::new().await;
4777 let client = server.client_builder().build().await;
4778 // This is the user ID that is inside MemberInvite
4779 let user_id = user_id!("@invited:localhost");
4780
4781 // When we receive a sync response saying "invited" is invited to a DM
4782 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4783 let response = SyncResponseBuilder::default()
4784 .add_joined_room(
4785 JoinedRoomBuilder::default()
4786 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4787 )
4788 .add_global_account_data(
4789 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4790 )
4791 .build_sync_response();
4792 client.base_client().receive_sync_response(response).await.unwrap();
4793
4794 // Then get_dm_room finds this room
4795 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4796 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4797 }
4798
4799 #[async_test]
4800 async fn test_get_dm_room_still_finds_left_room() {
4801 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4802 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4803
4804 let server = MatrixMockServer::new().await;
4805 let client = server.client_builder().build().await;
4806 // This is the user ID that is inside MemberAdditional.
4807 // Note the confusing username, so we can share
4808 // GlobalAccountDataTestEvent::Direct with the invited test.
4809 let user_id = user_id!("@invited:localhost");
4810
4811 // When we receive a sync response saying "invited" has left a DM
4812 let f = EventFactory::new().sender(user_id);
4813 let response = SyncResponseBuilder::default()
4814 .add_joined_room(
4815 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4816 )
4817 .add_global_account_data(
4818 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4819 )
4820 .build_sync_response();
4821 client.base_client().receive_sync_response(response).await.unwrap();
4822
4823 // Then get_dm_room finds this room
4824 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4825 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4826 }
4827
4828 #[async_test]
4829 async fn test_device_exists() {
4830 let server = MatrixMockServer::new().await;
4831 let client = server.client_builder().build().await;
4832
4833 server.mock_get_device().ok().expect(1).mount().await;
4834
4835 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4836 }
4837
4838 #[async_test]
4839 async fn test_device_exists_404() {
4840 let server = MatrixMockServer::new().await;
4841 let client = server.client_builder().build().await;
4842
4843 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4844 }
4845
4846 #[async_test]
4847 async fn test_device_exists_500() {
4848 let server = MatrixMockServer::new().await;
4849 let client = server.client_builder().build().await;
4850
4851 server.mock_get_device().error500().expect(1).mount().await;
4852
4853 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4854 }
4855
4856 #[async_test]
4857 async fn test_fetching_well_known_with_homeserver_url() {
4858 let server = MatrixMockServer::new().await;
4859 let client = server.client_builder().build().await;
4860 server.mock_well_known().ok().mount().await;
4861
4862 assert_matches!(client.fetch_client_well_known().await, Some(_));
4863 }
4864
4865 #[async_test]
4866 async fn test_fetching_well_known_with_server_name() {
4867 let server = MatrixMockServer::new().await;
4868 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4869
4870 server.mock_well_known().ok().mount().await;
4871
4872 let client = MockClientBuilder::new(None)
4873 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4874 .build()
4875 .await;
4876
4877 assert_matches!(client.fetch_client_well_known().await, Some(_));
4878 }
4879
4880 #[async_test]
4881 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4882 let server = MatrixMockServer::new().await;
4883 server.mock_well_known().ok().mount().await;
4884
4885 let user_id =
4886 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4887 let client = MockClientBuilder::new(None)
4888 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4889 .build()
4890 .await;
4891
4892 assert_matches!(client.fetch_client_well_known().await, Some(_));
4893 }
4894}