matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{BTreeMap, BTreeSet, btree_map},
19 fmt::{self, Debug},
20 future::{Future, ready},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::{StreamExt, join};
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32 DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35 BaseClient, DmRoomDefinition, RoomInfoNotableUpdate, RoomState, RoomStateFilter,
36 SendOutsideWasm, SessionMeta, StateStoreDataKey, StateStoreDataValue, StoreError,
37 SyncOutsideWasm, ThreadingSupport,
38 event_cache::store::EventCacheStoreLock,
39 media::store::MediaStoreLock,
40 store::{DynStateStore, RoomLoadSettings, SupportedVersionsResponse, WellKnownResponse},
41 sync::{Notification, RoomUpdates},
42 task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl::TtlValue};
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50 api::{
51 FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52 client::{
53 account::whoami,
54 alias::{create_alias, delete_alias, get_alias},
55 authenticated_media,
56 device::{self, delete_devices, get_devices, update_device},
57 directory::{get_public_rooms, get_public_rooms_filtered},
58 discovery::{
59 discover_homeserver::{self, RtcFocusInfo},
60 get_supported_versions,
61 },
62 filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
63 knock::knock_room,
64 media,
65 membership::{join_room_by_id, join_room_by_id_or_alias},
66 room::create_room,
67 session::login::v3::DiscoveryInfo,
68 sync::sync_events,
69 threads::get_thread_subscriptions_changes,
70 uiaa,
71 user_directory::search_users,
72 },
73 error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
74 path_builder::PathBuilder,
75 },
76 assign,
77 events::{beacon_info::OriginalSyncBeaconInfoEvent, direct::DirectUserIdentifier},
78 push::Ruleset,
79 time::Instant,
80};
81use serde::de::DeserializeOwned;
82use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
83use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
84use url::Url;
85
86use self::{
87 caches::{Cache, CachedValue, ClientCaches},
88 futures::SendRequest,
89};
90use crate::{
91 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
92 Room, SessionTokens, TransmissionProgress,
93 authentication::{
94 AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
95 oauth::OAuth,
96 },
97 client::{
98 homeserver_capabilities::HomeserverCapabilities,
99 thread_subscriptions::ThreadSubscriptionCatchup,
100 },
101 config::{RequestConfig, SyncToken},
102 deduplicating_handler::DeduplicatingHandler,
103 error::HttpResult,
104 event_cache::EventCache,
105 event_handler::{
106 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107 EventHandlerStore, ObservableEventHandler, SyncEvent,
108 },
109 http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
110 latest_events::LatestEvents,
111 live_locations_observer::BeaconInfoUpdate,
112 media::MediaError,
113 notification_settings::NotificationSettings,
114 room::RoomMember,
115 room_preview::RoomPreview,
116 send_queue::{SendQueue, SendQueueData},
117 sliding_sync::Version as SlidingSyncVersion,
118 sync::{RoomUpdate, SyncResponse},
119};
120#[cfg(feature = "e2e-encryption")]
121use crate::{
122 cross_process_lock::CrossProcessLock,
123 encryption::{
124 DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
125 VerificationState,
126 },
127};
128
129mod builder;
130pub(crate) mod caches;
131pub(crate) mod futures;
132pub(crate) mod homeserver_capabilities;
133pub(crate) mod thread_subscriptions;
134
135pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
136#[cfg(feature = "experimental-search")]
137use crate::search_index::SearchIndex;
138
139#[cfg(not(target_family = "wasm"))]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
143
144#[cfg(not(target_family = "wasm"))]
145type NotificationHandlerFn =
146 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
147#[cfg(target_family = "wasm")]
148type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
149
150/// Enum controlling if a loop running callbacks should continue or abort.
151///
152/// This is mainly used in the [`sync_with_callback`] method, the return value
153/// of the provided callback controls if the sync loop should be exited.
154///
155/// [`sync_with_callback`]: #method.sync_with_callback
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum LoopCtrl {
158 /// Continue running the loop.
159 Continue,
160 /// Break out of the loop.
161 Break,
162}
163
164/// Represents changes that can occur to a `Client`s `Session`.
165#[derive(Debug, Clone, PartialEq)]
166pub enum SessionChange {
167 /// The session's token is no longer valid.
168 UnknownToken(UnknownTokenErrorData),
169 /// The session's tokens have been refreshed.
170 TokensRefreshed,
171}
172
173/// Information about the server vendor obtained from the federation API.
174#[derive(Debug, Clone, PartialEq, Eq)]
175#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
176pub struct ServerVendorInfo {
177 /// The server name.
178 pub server_name: String,
179 /// The server version.
180 pub version: String,
181}
182
183/// An async/await enabled Matrix client.
184///
185/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
186#[derive(Clone)]
187pub struct Client {
188 pub(crate) inner: Arc<ClientInner>,
189}
190
191#[derive(Default)]
192pub(crate) struct ClientLocks {
193 /// Lock ensuring that only a single room may be marked as a DM at once.
194 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
195 /// explanation.
196 pub(crate) mark_as_dm_lock: Mutex<()>,
197
198 /// Lock ensuring that only a single secret store is getting opened at the
199 /// same time.
200 ///
201 /// This is important so we don't accidentally create multiple different new
202 /// default secret storage keys.
203 #[cfg(feature = "e2e-encryption")]
204 pub(crate) open_secret_store_lock: Mutex<()>,
205
206 /// Lock ensuring that we're only storing a single secret at a time.
207 ///
208 /// Take a look at the [`SecretStore::put_secret`] method for a more
209 /// detailed explanation.
210 ///
211 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
212 #[cfg(feature = "e2e-encryption")]
213 pub(crate) store_secret_lock: Mutex<()>,
214
215 /// Lock ensuring that only one method at a time might modify our backup.
216 #[cfg(feature = "e2e-encryption")]
217 pub(crate) backup_modify_lock: Mutex<()>,
218
219 /// Lock ensuring that we're going to attempt to upload backups for a single
220 /// requester.
221 #[cfg(feature = "e2e-encryption")]
222 pub(crate) backup_upload_lock: Mutex<()>,
223
224 /// Handler making sure we only have one group session sharing request in
225 /// flight per room.
226 #[cfg(feature = "e2e-encryption")]
227 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
228
229 /// Lock making sure we're only doing one key claim request at a time.
230 #[cfg(feature = "e2e-encryption")]
231 pub(crate) key_claim_lock: Mutex<()>,
232
233 /// Handler to ensure that only one members request is running at a time,
234 /// given a room.
235 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
236
237 /// Handler to ensure that only one encryption state request is running at a
238 /// time, given a room.
239 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
240
241 /// Deduplicating handler for sending read receipts. The string is an
242 /// internal implementation detail, see [`Self::send_single_receipt`].
243 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
244
245 #[cfg(feature = "e2e-encryption")]
246 pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
247
248 /// Latest "generation" of data known by the crypto store.
249 ///
250 /// This is a counter that only increments, set in the database (and can
251 /// wrap). It's incremented whenever some process acquires a lock for the
252 /// first time. *This assumes the crypto store lock is being held, to
253 /// avoid data races on writing to this value in the store*.
254 ///
255 /// The current process will maintain this value in local memory and in the
256 /// DB over time. Observing a different value than the one read in
257 /// memory, when reading from the store indicates that somebody else has
258 /// written into the database under our feet.
259 ///
260 /// TODO: this should live in the `OlmMachine`, since it's information
261 /// related to the lock. As of today (2023-07-28), we blow up the entire
262 /// olm machine when there's a generation mismatch. So storing the
263 /// generation in the olm machine would make the client think there's
264 /// *always* a mismatch, and that's why we need to store the generation
265 /// outside the `OlmMachine`.
266 #[cfg(feature = "e2e-encryption")]
267 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
268}
269
270pub(crate) struct ClientInner {
271 /// All the data related to authentication and authorization.
272 pub(crate) auth_ctx: Arc<AuthCtx>,
273
274 /// The URL of the server.
275 ///
276 /// Not to be confused with the `Self::homeserver`. `server` is usually
277 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
278 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
279 /// homeserver (at the time of writing — 2024-08-28).
280 ///
281 /// This value is optional depending on how the `Client` has been built.
282 /// If it's been built from a homeserver URL directly, we don't know the
283 /// server. However, if the `Client` has been built from a server URL or
284 /// name, then the homeserver has been discovered, and we know both.
285 server: Option<Url>,
286
287 /// The URL of the homeserver to connect to.
288 ///
289 /// This is the URL for the client-server Matrix API.
290 homeserver: StdRwLock<Url>,
291
292 /// The sliding sync version.
293 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
294
295 /// The underlying HTTP client.
296 pub(crate) http_client: HttpClient,
297
298 /// User session data.
299 pub(super) base_client: BaseClient,
300
301 /// Collection of in-memory caches for the [`Client`].
302 pub(crate) caches: ClientCaches,
303
304 /// Collection of locks individual client methods might want to use, either
305 /// to ensure that only a single call to a method happens at once or to
306 /// deduplicate multiple calls to a method.
307 pub(crate) locks: ClientLocks,
308
309 /// The cross-process lock configuration.
310 ///
311 /// The SDK provides cross-process store locks (see
312 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
313 /// [`CrossProcessLockConfig::MultiProcess`] is used.
314 ///
315 /// If multiple `Client`s are running in different processes, this
316 /// value MUST be different for each `Client`.
317 cross_process_lock_config: CrossProcessLockConfig,
318
319 /// A mapping of the times at which the current user sent typing notices,
320 /// keyed by room.
321 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
322
323 /// Event handlers. See `add_event_handler`.
324 pub(crate) event_handlers: EventHandlerStore,
325
326 /// Notification handlers. See `register_notification_handler`.
327 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
328
329 /// The sender-side of channels used to receive room updates.
330 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
331
332 /// The sender-side of a channel used to observe all the room updates of a
333 /// sync response.
334 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
335
336 /// Whether the client should update its homeserver URL with the discovery
337 /// information present in the login response.
338 respect_login_well_known: bool,
339
340 /// An event that can be listened on to wait for a successful sync. The
341 /// event will only be fired if a sync loop is running. Can be used for
342 /// synchronization, e.g. if we send out a request to create a room, we can
343 /// wait for the sync to get the data to fetch a room object from the state
344 /// store.
345 pub(crate) sync_beat: event_listener::Event,
346
347 /// A central cache for events, inactive first.
348 ///
349 /// It becomes active when [`EventCache::subscribe`] is called.
350 pub(crate) event_cache: OnceCell<EventCache>,
351
352 /// End-to-end encryption related state.
353 #[cfg(feature = "e2e-encryption")]
354 pub(crate) e2ee: EncryptionData,
355
356 /// The verification state of our own device.
357 #[cfg(feature = "e2e-encryption")]
358 pub(crate) verification_state: SharedObservable<VerificationState>,
359
360 /// Whether to enable the experimental support for sending and receiving
361 /// encrypted room history on invite, per [MSC4268].
362 ///
363 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
364 #[cfg(feature = "e2e-encryption")]
365 pub(crate) enable_share_history_on_invite: bool,
366
367 /// Data related to the [`SendQueue`].
368 ///
369 /// [`SendQueue`]: crate::send_queue::SendQueue
370 pub(crate) send_queue_data: Arc<SendQueueData>,
371
372 /// The `max_upload_size` value of the homeserver, it contains the max
373 /// request size you can send.
374 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
375
376 /// The entry point to get the [`LatestEvent`] of rooms and threads.
377 ///
378 /// [`LatestEvent`]: crate::latest_event::LatestEvent
379 latest_events: OnceCell<LatestEvents>,
380
381 /// Service handling the catching up of thread subscriptions in the
382 /// background.
383 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
384
385 #[cfg(feature = "experimental-search")]
386 /// Handler for [`RoomIndex`]'s of each room
387 search_index: SearchIndex,
388
389 /// A monitor for background tasks spawned by the client.
390 pub(crate) task_monitor: TaskMonitor,
391
392 /// A sender to notify subscribers about duplicate key upload errors
393 /// triggered by requests to /keys/upload.
394 #[cfg(feature = "e2e-encryption")]
395 pub(crate) duplicate_key_upload_error_sender:
396 broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
397}
398
399impl ClientInner {
400 /// Create a new `ClientInner`.
401 ///
402 /// All the fields passed as parameters here are those that must be cloned
403 /// upon instantiation of a sub-client, e.g. a client specialized for
404 /// notifications.
405 #[allow(clippy::too_many_arguments)]
406 async fn new(
407 auth_ctx: Arc<AuthCtx>,
408 server: Option<Url>,
409 homeserver: Url,
410 sliding_sync_version: SlidingSyncVersion,
411 http_client: HttpClient,
412 base_client: BaseClient,
413 supported_versions: CachedValue<TtlValue<SupportedVersions>>,
414 well_known: CachedValue<TtlValue<Option<WellKnownResponse>>>,
415 respect_login_well_known: bool,
416 event_cache: OnceCell<EventCache>,
417 send_queue: Arc<SendQueueData>,
418 latest_events: OnceCell<LatestEvents>,
419 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
420 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
421 cross_process_lock_config: CrossProcessLockConfig,
422 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
423 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
424 ) -> Arc<Self> {
425 let caches = ClientCaches {
426 supported_versions: Cache::with_value(supported_versions),
427 well_known: Cache::with_value(well_known),
428 server_metadata: Cache::new(),
429 homeserver_capabilities: Cache::new(),
430 };
431
432 let client = Self {
433 server,
434 homeserver: StdRwLock::new(homeserver),
435 auth_ctx,
436 sliding_sync_version: StdRwLock::new(sliding_sync_version),
437 http_client,
438 base_client,
439 caches,
440 locks: Default::default(),
441 cross_process_lock_config,
442 typing_notice_times: Default::default(),
443 event_handlers: Default::default(),
444 notification_handlers: Default::default(),
445 room_update_channels: Default::default(),
446 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
447 // ballast for all observers to catch up.
448 room_updates_sender: broadcast::Sender::new(32),
449 respect_login_well_known,
450 sync_beat: event_listener::Event::new(),
451 event_cache,
452 send_queue_data: send_queue,
453 latest_events,
454 #[cfg(feature = "e2e-encryption")]
455 e2ee: EncryptionData::new(encryption_settings),
456 #[cfg(feature = "e2e-encryption")]
457 verification_state: SharedObservable::new(VerificationState::Unknown),
458 #[cfg(feature = "e2e-encryption")]
459 enable_share_history_on_invite,
460 server_max_upload_size: Mutex::new(OnceCell::new()),
461 #[cfg(feature = "experimental-search")]
462 search_index: search_index_handler,
463 thread_subscription_catchup,
464 task_monitor: TaskMonitor::new(),
465 #[cfg(feature = "e2e-encryption")]
466 duplicate_key_upload_error_sender: broadcast::channel(1).0,
467 };
468
469 #[allow(clippy::let_and_return)]
470 let client = Arc::new(client);
471
472 #[cfg(feature = "e2e-encryption")]
473 client.e2ee.initialize_tasks(&client);
474
475 let init_event_cache = client.event_cache.get_or_init(|| async {
476 EventCache::new(&client, client.base_client.event_cache_store().clone())
477 });
478
479 let init_thread_subscription_catchup =
480 client.thread_subscription_catchup.get_or_init(|| async {
481 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
482 });
483
484 let _ = join!(init_event_cache, init_thread_subscription_catchup);
485
486 client
487 }
488}
489
490#[cfg(not(tarpaulin_include))]
491impl Debug for Client {
492 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
493 write!(fmt, "Client")
494 }
495}
496
497impl Client {
498 /// Create a new [`Client`] that will use the given homeserver.
499 ///
500 /// # Arguments
501 ///
502 /// * `homeserver_url` - The homeserver that the client should connect to.
503 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
504 Self::builder().homeserver_url(homeserver_url).build().await
505 }
506
507 /// Returns a subscriber that publishes an event every time the ignore user
508 /// list changes.
509 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
510 self.inner.base_client.subscribe_to_ignore_user_list_changes()
511 }
512
513 /// Create a new [`ClientBuilder`].
514 pub fn builder() -> ClientBuilder {
515 ClientBuilder::new()
516 }
517
518 pub(crate) fn base_client(&self) -> &BaseClient {
519 &self.inner.base_client
520 }
521
522 /// The underlying HTTP client.
523 pub fn http_client(&self) -> &reqwest::Client {
524 &self.inner.http_client.inner
525 }
526
527 pub(crate) fn locks(&self) -> &ClientLocks {
528 &self.inner.locks
529 }
530
531 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
532 &self.inner.auth_ctx
533 }
534
535 /// The cross-process store lock configuration used by this [`Client`].
536 ///
537 /// The SDK provides cross-process store locks (see
538 /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
539 /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
540 /// the value used for all cross-process store locks used by this
541 /// `Client`.
542 pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
543 &self.inner.cross_process_lock_config
544 }
545
546 /// Change the homeserver URL used by this client.
547 ///
548 /// # Arguments
549 ///
550 /// * `homeserver_url` - The new URL to use.
551 fn set_homeserver(&self, homeserver_url: Url) {
552 *self.inner.homeserver.write().unwrap() = homeserver_url;
553 }
554
555 /// Change to a different homeserver and re-resolve well-known.
556 #[cfg(feature = "e2e-encryption")]
557 pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
558 &self,
559 homeserver_url: Url,
560 ) -> Result<()> {
561 self.set_homeserver(homeserver_url);
562 self.reset_well_known().await?;
563 if let Some(well_known) = self.well_known().await {
564 self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
565 }
566 Ok(())
567 }
568
569 /// Retrieves a helper component to access the [`HomeserverCapabilities`]
570 /// supported or disabled by the homeserver.
571 pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
572 HomeserverCapabilities::new(self.clone())
573 }
574
575 /// Get the server vendor information from the federation API.
576 ///
577 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
578 /// both the server's software name and version.
579 ///
580 /// # Examples
581 ///
582 /// ```no_run
583 /// # use matrix_sdk::Client;
584 /// # use url::Url;
585 /// # async {
586 /// # let homeserver = Url::parse("http://example.com")?;
587 /// let client = Client::new(homeserver).await?;
588 ///
589 /// let server_info = client.server_vendor_info(None).await?;
590 /// println!(
591 /// "Server: {}, Version: {}",
592 /// server_info.server_name, server_info.version
593 /// );
594 /// # anyhow::Ok(()) };
595 /// ```
596 #[cfg(feature = "federation-api")]
597 pub async fn server_vendor_info(
598 &self,
599 request_config: Option<RequestConfig>,
600 ) -> HttpResult<ServerVendorInfo> {
601 use ruma::api::federation::discovery::get_server_version;
602
603 let res = self
604 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
605 .await?;
606
607 // Extract server info, using defaults if fields are missing.
608 let server = res.server.unwrap_or_default();
609 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
610 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
611
612 Ok(ServerVendorInfo { server_name: server_name_str, version })
613 }
614
615 /// Get a copy of the default request config.
616 ///
617 /// The default request config is what's used when sending requests if no
618 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
619 /// function with such a parameter.
620 ///
621 /// If the default request config was not customized through
622 /// [`ClientBuilder`] when creating this `Client`, the returned value will
623 /// be equivalent to [`RequestConfig::default()`].
624 pub fn request_config(&self) -> RequestConfig {
625 self.inner.http_client.request_config
626 }
627
628 /// Check whether the client has been activated.
629 ///
630 /// A client is considered active when:
631 ///
632 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
633 /// is logged in,
634 /// 2. Has loaded cached data from storage,
635 /// 3. If encryption is enabled, it also initialized or restored its
636 /// `OlmMachine`.
637 pub fn is_active(&self) -> bool {
638 self.inner.base_client.is_active()
639 }
640
641 /// The server used by the client.
642 ///
643 /// See `Self::server` to learn more.
644 pub fn server(&self) -> Option<&Url> {
645 self.inner.server.as_ref()
646 }
647
648 /// The homeserver of the client.
649 pub fn homeserver(&self) -> Url {
650 self.inner.homeserver.read().unwrap().clone()
651 }
652
653 /// Get the sliding sync version.
654 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
655 self.inner.sliding_sync_version.read().unwrap().clone()
656 }
657
658 /// Override the sliding sync version.
659 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
660 let mut lock = self.inner.sliding_sync_version.write().unwrap();
661 *lock = version;
662 }
663
664 /// Get the Matrix user session meta information.
665 ///
666 /// If the client is currently logged in, this will return a
667 /// [`SessionMeta`] object which contains the user ID and device ID.
668 /// Otherwise it returns `None`.
669 pub fn session_meta(&self) -> Option<&SessionMeta> {
670 self.base_client().session_meta()
671 }
672
673 /// Returns a receiver that gets events for each room info update. To watch
674 /// for new events, use `receiver.resubscribe()`.
675 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
676 self.base_client().room_info_notable_update_receiver()
677 }
678
679 /// Performs a search for users.
680 /// The search is performed case-insensitively on user IDs and display names
681 ///
682 /// # Arguments
683 ///
684 /// * `search_term` - The search term for the search
685 /// * `limit` - The maximum number of results to return. Defaults to 10.
686 ///
687 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
688 pub async fn search_users(
689 &self,
690 search_term: &str,
691 limit: u64,
692 ) -> HttpResult<search_users::v3::Response> {
693 let mut request = search_users::v3::Request::new(search_term.to_owned());
694
695 if let Some(limit) = UInt::new(limit) {
696 request.limit = limit;
697 }
698
699 self.send(request).await
700 }
701
702 /// Get the user id of the current owner of the client.
703 pub fn user_id(&self) -> Option<&UserId> {
704 self.session_meta().map(|s| s.user_id.as_ref())
705 }
706
707 /// Get the device ID that identifies the current session.
708 pub fn device_id(&self) -> Option<&DeviceId> {
709 self.session_meta().map(|s| s.device_id.as_ref())
710 }
711
712 /// Get the current access token for this session.
713 ///
714 /// Will be `None` if the client has not been logged in.
715 pub fn access_token(&self) -> Option<String> {
716 self.auth_ctx().access_token()
717 }
718
719 /// Get the current tokens for this session.
720 ///
721 /// To be notified of changes in the session tokens, use
722 /// [`Client::subscribe_to_session_changes()`] or
723 /// [`Client::set_session_callbacks()`].
724 ///
725 /// Returns `None` if the client has not been logged in.
726 pub fn session_tokens(&self) -> Option<SessionTokens> {
727 self.auth_ctx().session_tokens()
728 }
729
730 /// Access the authentication API used to log in this client.
731 ///
732 /// Will be `None` if the client has not been logged in.
733 pub fn auth_api(&self) -> Option<AuthApi> {
734 match self.auth_ctx().auth_data.get()? {
735 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
736 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
737 }
738 }
739
740 /// Get the whole session info of this client.
741 ///
742 /// Will be `None` if the client has not been logged in.
743 ///
744 /// Can be used with [`Client::restore_session`] to restore a previously
745 /// logged-in session.
746 pub fn session(&self) -> Option<AuthSession> {
747 match self.auth_api()? {
748 AuthApi::Matrix(api) => api.session().map(Into::into),
749 AuthApi::OAuth(api) => api.full_session().map(Into::into),
750 }
751 }
752
753 /// Get a reference to the state store.
754 pub fn state_store(&self) -> &DynStateStore {
755 self.base_client().state_store()
756 }
757
758 /// Get a reference to the event cache store.
759 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
760 self.base_client().event_cache_store()
761 }
762
763 /// Get a reference to the media store.
764 pub fn media_store(&self) -> &MediaStoreLock {
765 self.base_client().media_store()
766 }
767
768 /// Access the native Matrix authentication API with this client.
769 pub fn matrix_auth(&self) -> MatrixAuth {
770 MatrixAuth::new(self.clone())
771 }
772
773 /// Get the account of the current owner of the client.
774 pub fn account(&self) -> Account {
775 Account::new(self.clone())
776 }
777
778 /// Get the encryption manager of the client.
779 #[cfg(feature = "e2e-encryption")]
780 pub fn encryption(&self) -> Encryption {
781 Encryption::new(self.clone())
782 }
783
784 /// Get the media manager of the client.
785 pub fn media(&self) -> Media {
786 Media::new(self.clone())
787 }
788
789 /// Get the pusher manager of the client.
790 pub fn pusher(&self) -> Pusher {
791 Pusher::new(self.clone())
792 }
793
794 /// Access the OAuth 2.0 API of the client.
795 pub fn oauth(&self) -> OAuth {
796 OAuth::new(self.clone())
797 }
798
799 /// Register a handler for a specific event type.
800 ///
801 /// The handler is a function or closure with one or more arguments. The
802 /// first argument is the event itself. All additional arguments are
803 /// "context" arguments: They have to implement [`EventHandlerContext`].
804 /// This trait is named that way because most of the types implementing it
805 /// give additional context about an event: The room it was in, its raw form
806 /// and other similar things. As two exceptions to this,
807 /// [`Client`] and [`EventHandlerHandle`] also implement the
808 /// `EventHandlerContext` trait so you don't have to clone your client
809 /// into the event handler manually and a handler can decide to remove
810 /// itself.
811 ///
812 /// Some context arguments are not universally applicable. A context
813 /// argument that isn't available for the given event type will result in
814 /// the event handler being skipped and an error being logged. The following
815 /// context argument types are only available for a subset of event types:
816 ///
817 /// * [`Room`] is only available for room-specific events, i.e. not for
818 /// events like global account data events or presence events.
819 ///
820 /// You can provide custom context via
821 /// [`add_event_handler_context`](Client::add_event_handler_context) and
822 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
823 /// into the event handler.
824 ///
825 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
826 ///
827 /// # Examples
828 ///
829 /// ```no_run
830 /// use matrix_sdk::{
831 /// deserialized_responses::EncryptionInfo,
832 /// event_handler::Ctx,
833 /// ruma::{
834 /// events::{
835 /// macros::EventContent,
836 /// push_rules::PushRulesEvent,
837 /// room::{
838 /// message::SyncRoomMessageEvent,
839 /// topic::SyncRoomTopicEvent,
840 /// member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
841 /// },
842 /// },
843 /// push::Action,
844 /// Int, MilliSecondsSinceUnixEpoch,
845 /// },
846 /// Client, Room,
847 /// };
848 /// use serde::{Deserialize, Serialize};
849 ///
850 /// # async fn example(client: Client) {
851 /// client.add_event_handler(
852 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
853 /// // Common usage: Room event plus room and client.
854 /// },
855 /// );
856 /// client.add_event_handler(
857 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
858 /// async move {
859 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
860 /// // unencrypted events and events that were decrypted by the SDK.
861 /// }
862 /// },
863 /// );
864 /// client.add_event_handler(
865 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
866 /// async move {
867 /// // A `Vec<Action>` parameter allows you to know which push actions
868 /// // are applicable for an event. For example, an event with
869 /// // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
870 /// // should be highlighted in the timeline.
871 /// }
872 /// },
873 /// );
874 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
875 /// // You can omit any or all arguments after the first.
876 /// });
877 ///
878 /// // Registering a temporary event handler:
879 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
880 /// /* Event handler */
881 /// });
882 /// client.remove_event_handler(handle);
883 ///
884 /// // Registering custom event handler context:
885 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
886 /// struct MyContext {
887 /// number: usize,
888 /// }
889 /// client.add_event_handler_context(MyContext { number: 5 });
890 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
891 /// // Use the context
892 /// });
893 ///
894 /// // This will handle membership events in joined rooms. Invites are special, see below.
895 /// client.add_event_handler(
896 /// |ev: SyncRoomMemberEvent| async move {},
897 /// );
898 ///
899 /// // To handle state events in invited rooms (including invite membership events),
900 /// // `StrippedRoomMemberEvent` should be used.
901 /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
902 /// client.add_event_handler(
903 /// |ev: StrippedRoomMemberEvent| async move {},
904 /// );
905 ///
906 /// // Custom events work exactly the same way, you just need to declare
907 /// // the content struct and use the EventContent derive macro on it.
908 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
909 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
910 /// struct TokenEventContent {
911 /// token: String,
912 /// #[serde(rename = "exp")]
913 /// expires_at: MilliSecondsSinceUnixEpoch,
914 /// }
915 ///
916 /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
917 /// todo!("Display the token");
918 /// });
919 ///
920 /// // Event handler closures can also capture local variables.
921 /// // Make sure they are cheap to clone though, because they will be cloned
922 /// // every time the closure is called.
923 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
924 ///
925 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
926 /// println!("Calling the handler with identifier {data}");
927 /// });
928 /// # }
929 /// ```
930 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
931 where
932 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
933 H: EventHandler<Ev, Ctx>,
934 {
935 self.add_event_handler_impl(handler, None)
936 }
937
938 /// Register a handler for a specific room, and event type.
939 ///
940 /// This method works the same way as
941 /// [`add_event_handler`][Self::add_event_handler], except that the handler
942 /// will only be called for events in the room with the specified ID. See
943 /// that method for more details on event handler functions.
944 ///
945 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
946 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
947 /// your use case.
948 pub fn add_room_event_handler<Ev, Ctx, H>(
949 &self,
950 room_id: &RoomId,
951 handler: H,
952 ) -> EventHandlerHandle
953 where
954 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
955 H: EventHandler<Ev, Ctx>,
956 {
957 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
958 }
959
960 /// Observe a specific event type.
961 ///
962 /// `Ev` represents the kind of event that will be observed. `Ctx`
963 /// represents the context that will come with the event. It relies on the
964 /// same mechanism as [`Client::add_event_handler`]. The main difference is
965 /// that it returns an [`ObservableEventHandler`] and doesn't require a
966 /// user-defined closure. It is possible to subscribe to the
967 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
968 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
969 /// Ctx)`.
970 ///
971 /// Be careful that only the most recent value can be observed. Subscribers
972 /// are notified when a new value is sent, but there is no guarantee
973 /// that they will see all values.
974 ///
975 /// # Example
976 ///
977 /// Let's see a classical usage:
978 ///
979 /// ```
980 /// use futures_util::StreamExt as _;
981 /// use matrix_sdk::{
982 /// Client, Room,
983 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
984 /// };
985 ///
986 /// # async fn example(client: Client) -> Option<()> {
987 /// let observer =
988 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
989 ///
990 /// let mut subscriber = observer.subscribe();
991 ///
992 /// let (event, (room, push_actions)) = subscriber.next().await?;
993 /// # Some(())
994 /// # }
995 /// ```
996 ///
997 /// Now let's see how to get several contexts that can be useful for you:
998 ///
999 /// ```
1000 /// use matrix_sdk::{
1001 /// Client, Room,
1002 /// deserialized_responses::EncryptionInfo,
1003 /// ruma::{
1004 /// events::room::{
1005 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1006 /// },
1007 /// push::Action,
1008 /// },
1009 /// };
1010 ///
1011 /// # async fn example(client: Client) {
1012 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1013 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1014 ///
1015 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1016 /// // to distinguish between unencrypted events and events that were decrypted
1017 /// // by the SDK.
1018 /// let _ = client
1019 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1020 /// );
1021 ///
1022 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1023 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1024 /// // should be highlighted in the timeline.
1025 /// let _ =
1026 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1027 ///
1028 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1029 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1030 /// # }
1031 /// ```
1032 ///
1033 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1034 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1035 where
1036 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1037 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1038 {
1039 self.observe_room_events_impl(None)
1040 }
1041
1042 /// Observe a specific room, and event type.
1043 ///
1044 /// This method works the same way as [`Client::observe_events`], except
1045 /// that the observability will only be applied for events in the room with
1046 /// the specified ID. See that method for more details.
1047 ///
1048 /// Be careful that only the most recent value can be observed. Subscribers
1049 /// are notified when a new value is sent, but there is no guarantee
1050 /// that they will see all values.
1051 pub fn observe_room_events<Ev, Ctx>(
1052 &self,
1053 room_id: &RoomId,
1054 ) -> ObservableEventHandler<(Ev, Ctx)>
1055 where
1056 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1057 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1058 {
1059 self.observe_room_events_impl(Some(room_id.to_owned()))
1060 }
1061
1062 /// Shared implementation for `Client::observe_events` and
1063 /// `Client::observe_room_events`.
1064 fn observe_room_events_impl<Ev, Ctx>(
1065 &self,
1066 room_id: Option<OwnedRoomId>,
1067 ) -> ObservableEventHandler<(Ev, Ctx)>
1068 where
1069 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1070 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1071 {
1072 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1073 // new value.
1074 let shared_observable = SharedObservable::new(None);
1075
1076 ObservableEventHandler::new(
1077 shared_observable.clone(),
1078 self.event_handler_drop_guard(self.add_event_handler_impl(
1079 move |event: Ev, context: Ctx| {
1080 shared_observable.set(Some((event, context)));
1081
1082 ready(())
1083 },
1084 room_id,
1085 )),
1086 )
1087 }
1088
1089 /// Subscribe to future `beacon_info` updates for the current user across
1090 /// all rooms.
1091 ///
1092 /// This stream is push-only: it emits only future updates observed during
1093 /// sync processing and does not replay existing state.
1094 pub fn observe_own_beacon_info_updates(
1095 &self,
1096 ) -> Result<impl Stream<Item = BeaconInfoUpdate> + use<>> {
1097 let observer = self.observe_events::<OriginalSyncBeaconInfoEvent, Room>();
1098 let mut stream = observer.subscribe();
1099 let own_user_id = self.user_id().ok_or(Error::AuthenticationRequired)?.to_owned();
1100 Ok(async_stream::stream! {
1101 let _observer = observer;
1102
1103 while let Some((event, room)) = stream.next().await {
1104 if event.state_key != own_user_id {
1105 continue;
1106 }
1107 yield BeaconInfoUpdate {
1108 room_id: room.room_id().to_owned(),
1109 event_id: event.event_id,
1110 content: event.content,
1111 };
1112 }
1113 })
1114 }
1115
1116 /// Remove the event handler associated with the handle.
1117 ///
1118 /// Note that you **must not** call `remove_event_handler` from the
1119 /// non-async part of an event handler, that is:
1120 ///
1121 /// ```ignore
1122 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1123 /// // âš this will cause a deadlock âš
1124 /// client.remove_event_handler(handle);
1125 ///
1126 /// async move {
1127 /// // removing the event handler here is fine
1128 /// client.remove_event_handler(handle);
1129 /// }
1130 /// })
1131 /// ```
1132 ///
1133 /// Note also that handlers that remove themselves will still execute with
1134 /// events received in the same sync cycle.
1135 ///
1136 /// # Arguments
1137 ///
1138 /// `handle` - The [`EventHandlerHandle`] that is returned when
1139 /// registering the event handler with [`Client::add_event_handler`].
1140 ///
1141 /// # Examples
1142 ///
1143 /// ```no_run
1144 /// # use url::Url;
1145 /// # use tokio::sync::mpsc;
1146 /// #
1147 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1148 /// #
1149 /// use matrix_sdk::{
1150 /// Client, event_handler::EventHandlerHandle,
1151 /// ruma::events::room::member::SyncRoomMemberEvent,
1152 /// };
1153 /// #
1154 /// # futures_executor::block_on(async {
1155 /// # let client = matrix_sdk::Client::builder()
1156 /// # .homeserver_url(homeserver)
1157 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1158 /// # .build()
1159 /// # .await
1160 /// # .unwrap();
1161 ///
1162 /// client.add_event_handler(
1163 /// |ev: SyncRoomMemberEvent,
1164 /// client: Client,
1165 /// handle: EventHandlerHandle| async move {
1166 /// // Common usage: Check arriving Event is the expected one
1167 /// println!("Expected RoomMemberEvent received!");
1168 /// client.remove_event_handler(handle);
1169 /// },
1170 /// );
1171 /// # });
1172 /// ```
1173 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1174 self.inner.event_handlers.remove(handle);
1175 }
1176
1177 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1178 /// the given handle.
1179 ///
1180 /// When the returned value is dropped, the event handler will be removed.
1181 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1182 EventHandlerDropGuard::new(handle, self.clone())
1183 }
1184
1185 /// Add an arbitrary value for use as event handler context.
1186 ///
1187 /// The value can be obtained in an event handler by adding an argument of
1188 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1189 ///
1190 /// If a value of the same type has been added before, it will be
1191 /// overwritten.
1192 ///
1193 /// # Examples
1194 ///
1195 /// ```no_run
1196 /// use matrix_sdk::{
1197 /// Room, event_handler::Ctx,
1198 /// ruma::events::room::message::SyncRoomMessageEvent,
1199 /// };
1200 /// # #[derive(Clone)]
1201 /// # struct SomeType;
1202 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1203 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1204 /// # futures_executor::block_on(async {
1205 /// # let client = matrix_sdk::Client::builder()
1206 /// # .homeserver_url(homeserver)
1207 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1208 /// # .build()
1209 /// # .await
1210 /// # .unwrap();
1211 ///
1212 /// // Handle used to send messages to the UI part of the app
1213 /// let my_gui_handle: SomeType = obtain_gui_handle();
1214 ///
1215 /// client.add_event_handler_context(my_gui_handle.clone());
1216 /// client.add_event_handler(
1217 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1218 /// async move {
1219 /// // gui_handle.send(DisplayMessage { message: ev });
1220 /// }
1221 /// },
1222 /// );
1223 /// # });
1224 /// ```
1225 pub fn add_event_handler_context<T>(&self, ctx: T)
1226 where
1227 T: Clone + Send + Sync + 'static,
1228 {
1229 self.inner.event_handlers.add_context(ctx);
1230 }
1231
1232 /// Register a handler for a notification.
1233 ///
1234 /// Similar to [`Client::add_event_handler`], but only allows functions
1235 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1236 /// [`Client`] for now.
1237 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1238 where
1239 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1240 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1241 {
1242 self.inner.notification_handlers.write().await.push(Box::new(
1243 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1244 ));
1245
1246 self
1247 }
1248
1249 /// Subscribe to all updates for the room with the given ID.
1250 ///
1251 /// The returned receiver will receive a new message for each sync response
1252 /// that contains updates for that room.
1253 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1254 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1255 btree_map::Entry::Vacant(entry) => {
1256 let (tx, rx) = broadcast::channel(8);
1257 entry.insert(tx);
1258 rx
1259 }
1260 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1261 }
1262 }
1263
1264 /// Subscribe to all updates to all rooms, whenever any has been received in
1265 /// a sync response.
1266 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1267 self.inner.room_updates_sender.subscribe()
1268 }
1269
1270 pub(crate) async fn notification_handlers(
1271 &self,
1272 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1273 self.inner.notification_handlers.read().await
1274 }
1275
1276 /// Get all the rooms the client knows about.
1277 ///
1278 /// This will return the list of joined, invited, and left rooms.
1279 pub fn rooms(&self) -> Vec<Room> {
1280 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1281 }
1282
1283 /// Get all the rooms the client knows about, filtered by room state.
1284 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1285 self.base_client()
1286 .rooms_filtered(filter)
1287 .into_iter()
1288 .map(|room| Room::new(self.clone(), room))
1289 .collect()
1290 }
1291
1292 /// Get a stream of all the rooms, in addition to the existing rooms.
1293 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1294 let (rooms, stream) = self.base_client().rooms_stream();
1295
1296 let map_room = |room| Room::new(self.clone(), room);
1297
1298 (
1299 rooms.into_iter().map(map_room).collect(),
1300 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1301 )
1302 }
1303
1304 /// Returns the joined rooms this client knows about.
1305 pub fn joined_rooms(&self) -> Vec<Room> {
1306 self.rooms_filtered(RoomStateFilter::JOINED)
1307 }
1308
1309 /// Returns the invited rooms this client knows about.
1310 pub fn invited_rooms(&self) -> Vec<Room> {
1311 self.rooms_filtered(RoomStateFilter::INVITED)
1312 }
1313
1314 /// Returns the left rooms this client knows about.
1315 pub fn left_rooms(&self) -> Vec<Room> {
1316 self.rooms_filtered(RoomStateFilter::LEFT)
1317 }
1318
1319 /// Returns the joined space rooms this client knows about.
1320 pub fn joined_space_rooms(&self) -> Vec<Room> {
1321 self.base_client()
1322 .rooms_filtered(RoomStateFilter::JOINED)
1323 .into_iter()
1324 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1325 .collect()
1326 }
1327
1328 /// Get a room with the given room id.
1329 ///
1330 /// # Arguments
1331 ///
1332 /// `room_id` - The unique id of the room that should be fetched.
1333 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1334 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1335 }
1336
1337 /// Gets the preview of a room, whether the current user has joined it or
1338 /// not.
1339 pub async fn get_room_preview(
1340 &self,
1341 room_or_alias_id: &RoomOrAliasId,
1342 via: Vec<OwnedServerName>,
1343 ) -> Result<RoomPreview> {
1344 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1345 Ok(room_id) => room_id.to_owned(),
1346 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1347 };
1348
1349 if let Some(room) = self.get_room(&room_id) {
1350 // The cached data can only be trusted if the room state is joined or
1351 // banned: for invite and knock rooms, no updates will be received
1352 // for the rooms after the invite/knock action took place so we may
1353 // have very out to date data for important fields such as
1354 // `join_rule`. For left rooms, the homeserver should return the latest info.
1355 match room.state() {
1356 RoomState::Joined | RoomState::Banned => {
1357 return Ok(RoomPreview::from_known_room(&room).await);
1358 }
1359 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1360 }
1361 }
1362
1363 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1364 }
1365
1366 /// Resolve a room alias to a room id and a list of servers which know
1367 /// about it.
1368 ///
1369 /// # Arguments
1370 ///
1371 /// `room_alias` - The room alias to be resolved.
1372 pub async fn resolve_room_alias(
1373 &self,
1374 room_alias: &RoomAliasId,
1375 ) -> HttpResult<get_alias::v3::Response> {
1376 let request = get_alias::v3::Request::new(room_alias.to_owned());
1377 self.send(request).await
1378 }
1379
1380 /// Checks if a room alias is not in use yet.
1381 ///
1382 /// Returns:
1383 /// - `Ok(true)` if the room alias is available.
1384 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1385 /// status code).
1386 /// - An `Err` otherwise.
1387 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1388 match self.resolve_room_alias(alias).await {
1389 // The room alias was resolved, so it's already in use.
1390 Ok(_) => Ok(false),
1391 Err(error) => {
1392 match error.client_api_error_kind() {
1393 // The room alias wasn't found, so it's available.
1394 Some(ErrorKind::NotFound) => Ok(true),
1395 _ => Err(error),
1396 }
1397 }
1398 }
1399 }
1400
1401 /// Adds a new room alias associated with a room to the room directory.
1402 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1403 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1404 self.send(request).await?;
1405 Ok(())
1406 }
1407
1408 /// Removes a room alias from the room directory.
1409 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1410 let request = delete_alias::v3::Request::new(alias.to_owned());
1411 self.send(request).await?;
1412 Ok(())
1413 }
1414
1415 /// Update the homeserver from the login response well-known if needed.
1416 ///
1417 /// # Arguments
1418 ///
1419 /// * `login_well_known` - The `well_known` field from a successful login
1420 /// response.
1421 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1422 if self.inner.respect_login_well_known
1423 && let Some(well_known) = login_well_known
1424 && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1425 {
1426 self.set_homeserver(homeserver);
1427 }
1428 }
1429
1430 /// Similar to [`Client::restore_session_with`], with
1431 /// [`RoomLoadSettings::default()`].
1432 ///
1433 /// # Panics
1434 ///
1435 /// Panics if a session was already restored or logged in.
1436 #[instrument(skip_all)]
1437 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1438 self.restore_session_with(session, RoomLoadSettings::default()).await
1439 }
1440
1441 /// Restore a session previously logged-in using one of the available
1442 /// authentication APIs. The number of rooms to restore is controlled by
1443 /// [`RoomLoadSettings`].
1444 ///
1445 /// See the documentation of the corresponding authentication API's
1446 /// `restore_session` method for more information.
1447 ///
1448 /// # Panics
1449 ///
1450 /// Panics if a session was already restored or logged in.
1451 #[instrument(skip_all)]
1452 pub async fn restore_session_with(
1453 &self,
1454 session: impl Into<AuthSession>,
1455 room_load_settings: RoomLoadSettings,
1456 ) -> Result<()> {
1457 let session = session.into();
1458 match session {
1459 AuthSession::Matrix(session) => {
1460 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1461 }
1462 AuthSession::OAuth(session) => {
1463 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1464 }
1465 }
1466 }
1467
1468 /// Refresh the access token using the authentication API used to log into
1469 /// this session.
1470 ///
1471 /// See the documentation of the authentication API's `refresh_access_token`
1472 /// method for more information.
1473 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1474 let Some(auth_api) = self.auth_api() else {
1475 return Err(RefreshTokenError::RefreshTokenRequired);
1476 };
1477
1478 match auth_api {
1479 AuthApi::Matrix(api) => {
1480 trace!("Token refresh: Using the homeserver.");
1481 Box::pin(api.refresh_access_token()).await?;
1482 }
1483 AuthApi::OAuth(api) => {
1484 trace!("Token refresh: Using OAuth 2.0.");
1485 Box::pin(api.refresh_access_token()).await?;
1486 }
1487 }
1488
1489 Ok(())
1490 }
1491
1492 /// Log out the current session using the proper authentication API.
1493 ///
1494 /// # Errors
1495 ///
1496 /// Returns an error if the session is not authenticated or if an error
1497 /// occurred while making the request to the server.
1498 pub async fn logout(&self) -> Result<(), Error> {
1499 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1500 match auth_api {
1501 AuthApi::Matrix(matrix_auth) => {
1502 matrix_auth.logout().await?;
1503 Ok(())
1504 }
1505 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1506 }
1507 }
1508
1509 /// Get or upload a sync filter.
1510 ///
1511 /// This method will either get a filter ID from the store or upload the
1512 /// filter definition to the homeserver and return the new filter ID.
1513 ///
1514 /// # Arguments
1515 ///
1516 /// * `filter_name` - The unique name of the filter, this name will be used
1517 /// locally to store and identify the filter ID returned by the server.
1518 ///
1519 /// * `definition` - The filter definition that should be uploaded to the
1520 /// server if no filter ID can be found in the store.
1521 ///
1522 /// # Examples
1523 ///
1524 /// ```no_run
1525 /// # use matrix_sdk::{
1526 /// # Client, config::SyncSettings,
1527 /// # ruma::api::client::{
1528 /// # filter::{
1529 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1530 /// # },
1531 /// # sync::sync_events::v3::Filter,
1532 /// # }
1533 /// # };
1534 /// # use url::Url;
1535 /// # async {
1536 /// # let homeserver = Url::parse("http://example.com").unwrap();
1537 /// # let client = Client::new(homeserver).await.unwrap();
1538 /// let mut filter = FilterDefinition::default();
1539 ///
1540 /// // Let's enable member lazy loading.
1541 /// filter.room.state.lazy_load_options =
1542 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1543 ///
1544 /// let filter_id = client
1545 /// .get_or_upload_filter("sync", filter)
1546 /// .await
1547 /// .unwrap();
1548 ///
1549 /// let sync_settings = SyncSettings::new()
1550 /// .filter(Filter::FilterId(filter_id));
1551 ///
1552 /// let response = client.sync_once(sync_settings).await.unwrap();
1553 /// # };
1554 #[instrument(skip(self, definition))]
1555 pub async fn get_or_upload_filter(
1556 &self,
1557 filter_name: &str,
1558 definition: FilterDefinition,
1559 ) -> Result<String> {
1560 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1561 debug!("Found filter locally");
1562 Ok(filter)
1563 } else {
1564 debug!("Didn't find filter locally");
1565 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1566 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1567 let response = self.send(request).await?;
1568
1569 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1570
1571 Ok(response.filter_id)
1572 }
1573 }
1574
1575 /// Prepare to join a room by ID, by getting the current details about it
1576 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1577 let room = self.get_room(room_id)?;
1578
1579 let inviter = match room.invite_details().await {
1580 Ok(details) => details.inviter,
1581 Err(Error::WrongRoomState(_)) => None,
1582 Err(e) => {
1583 warn!("Error fetching invite details for room: {e:?}");
1584 None
1585 }
1586 };
1587
1588 Some(PreJoinRoomInfo { inviter })
1589 }
1590
1591 /// Finish joining a room.
1592 ///
1593 /// If the room was an invite that should be marked as a DM, will include it
1594 /// in the DM event after creating the joined room.
1595 ///
1596 /// If encrypted history sharing is enabled, will check to see if we have a
1597 /// key bundle, and import it if so.
1598 ///
1599 /// # Arguments
1600 ///
1601 /// * `room_id` - The `RoomId` of the room that was joined.
1602 /// * `pre_join_room_info` - Information about the room before we joined.
1603 async fn finish_join_room(
1604 &self,
1605 room_id: &RoomId,
1606 pre_join_room_info: Option<PreJoinRoomInfo>,
1607 ) -> Result<Room> {
1608 info!(?room_id, ?pre_join_room_info, "Completing room join");
1609 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1610 room.state() == RoomState::Invited
1611 && room.is_direct().await.unwrap_or_else(|e| {
1612 warn!(%room_id, "is_direct() failed: {e}");
1613 false
1614 })
1615 } else {
1616 false
1617 };
1618
1619 let base_room = self
1620 .base_client()
1621 .room_joined(
1622 room_id,
1623 pre_join_room_info
1624 .as_ref()
1625 .and_then(|info| info.inviter.as_ref())
1626 .map(|i| i.user_id().to_owned()),
1627 )
1628 .await?;
1629 let room = Room::new(self.clone(), base_room);
1630
1631 if mark_as_dm {
1632 room.set_is_direct(true).await?;
1633 }
1634
1635 // If we joined following an invite, check if we had previously received a key
1636 // bundle from the inviter, and import it if so.
1637 //
1638 // It's important that we only do this once `BaseClient::room_joined` has
1639 // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1640 // race.
1641 #[cfg(feature = "e2e-encryption")]
1642 if self.inner.enable_share_history_on_invite
1643 && let Some(inviter) =
1644 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1645 {
1646 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1647 .await?;
1648 }
1649
1650 // Suppress "unused variable" and "unused field" lints
1651 #[cfg(not(feature = "e2e-encryption"))]
1652 let _ = pre_join_room_info.map(|i| i.inviter);
1653
1654 Ok(room)
1655 }
1656
1657 /// Join a room by `RoomId`.
1658 ///
1659 /// Returns the `Room` in the joined state.
1660 ///
1661 /// # Arguments
1662 ///
1663 /// * `room_id` - The `RoomId` of the room to be joined.
1664 #[instrument(skip(self))]
1665 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1666 // See who invited us to this room, if anyone. Note we have to do this before
1667 // making the `/join` request, otherwise we could race against the sync.
1668 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1669
1670 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1671 let response = self.send(request).await?;
1672 self.finish_join_room(&response.room_id, pre_join_info).await
1673 }
1674
1675 /// Join a room by `RoomOrAliasId`.
1676 ///
1677 /// Returns the `Room` in the joined state.
1678 ///
1679 /// # Arguments
1680 ///
1681 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1682 /// alias looks like `#name:example.com`.
1683 /// * `server_names` - The server names to be used for resolving the alias,
1684 /// if needs be.
1685 #[instrument(skip(self))]
1686 pub async fn join_room_by_id_or_alias(
1687 &self,
1688 alias: &RoomOrAliasId,
1689 server_names: &[OwnedServerName],
1690 ) -> Result<Room> {
1691 let pre_join_info = {
1692 match alias.try_into() {
1693 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1694 Err(_) => {
1695 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1696 // responding to an invitation to the room, and therefore don't need to handle
1697 // things that happen as a result of invites.
1698 None
1699 }
1700 }
1701 };
1702 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1703 via: server_names.to_owned(),
1704 });
1705 let response = self.send(request).await?;
1706 self.finish_join_room(&response.room_id, pre_join_info).await
1707 }
1708
1709 /// Search the homeserver's directory of public rooms.
1710 ///
1711 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1712 /// a `get_public_rooms::Response`.
1713 ///
1714 /// # Arguments
1715 ///
1716 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1717 ///
1718 /// * `since` - Pagination token from a previous request.
1719 ///
1720 /// * `server` - The name of the server, if `None` the requested server is
1721 /// used.
1722 ///
1723 /// # Examples
1724 /// ```no_run
1725 /// use matrix_sdk::Client;
1726 /// # use url::Url;
1727 /// # let homeserver = Url::parse("http://example.com").unwrap();
1728 /// # let limit = Some(10);
1729 /// # let since = Some("since token");
1730 /// # let server = Some("servername.com".try_into().unwrap());
1731 /// # async {
1732 /// let mut client = Client::new(homeserver).await.unwrap();
1733 ///
1734 /// client.public_rooms(limit, since, server).await;
1735 /// # };
1736 /// ```
1737 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1738 pub async fn public_rooms(
1739 &self,
1740 limit: Option<u32>,
1741 since: Option<&str>,
1742 server: Option<&ServerName>,
1743 ) -> HttpResult<get_public_rooms::v3::Response> {
1744 let limit = limit.map(UInt::from);
1745
1746 let request = assign!(get_public_rooms::v3::Request::new(), {
1747 limit,
1748 since: since.map(ToOwned::to_owned),
1749 server: server.map(ToOwned::to_owned),
1750 });
1751 self.send(request).await
1752 }
1753
1754 /// Create a room with the given parameters.
1755 ///
1756 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1757 /// created room.
1758 ///
1759 /// If you want to create a direct message with one specific user, you can
1760 /// use [`create_dm`][Self::create_dm], which is more convenient than
1761 /// assembling the [`create_room::v3::Request`] yourself.
1762 ///
1763 /// If the `is_direct` field of the request is set to `true` and at least
1764 /// one user is invited, the room will be automatically added to the direct
1765 /// rooms in the account data.
1766 ///
1767 /// # Examples
1768 ///
1769 /// ```no_run
1770 /// use matrix_sdk::{
1771 /// Client,
1772 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1773 /// };
1774 /// # use url::Url;
1775 /// #
1776 /// # async {
1777 /// # let homeserver = Url::parse("http://example.com").unwrap();
1778 /// let request = CreateRoomRequest::new();
1779 /// let client = Client::new(homeserver).await.unwrap();
1780 /// assert!(client.create_room(request).await.is_ok());
1781 /// # };
1782 /// ```
1783 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1784 let invite = request.invite.clone();
1785 let is_direct_room = request.is_direct;
1786 let response = self.send(request).await?;
1787
1788 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1789
1790 let joined_room = Room::new(self.clone(), base_room);
1791
1792 if is_direct_room
1793 && !invite.is_empty()
1794 && let Err(error) =
1795 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1796 {
1797 // FIXME: Retry in the background
1798 error!("Failed to mark room as DM: {error}");
1799 }
1800
1801 Ok(joined_room)
1802 }
1803
1804 /// Create a DM room.
1805 ///
1806 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1807 /// given user being invited, the room marked `is_direct` and both the
1808 /// creator and invitee getting the default maximum power level.
1809 ///
1810 /// If the `e2e-encryption` feature is enabled, the room will also be
1811 /// encrypted.
1812 ///
1813 /// # Arguments
1814 ///
1815 /// * `user_id` - The ID of the user to create a DM for.
1816 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1817 #[cfg(feature = "e2e-encryption")]
1818 let initial_state = vec![
1819 InitialStateEvent::with_empty_state_key(
1820 RoomEncryptionEventContent::with_recommended_defaults(),
1821 )
1822 .to_raw_any(),
1823 ];
1824
1825 #[cfg(not(feature = "e2e-encryption"))]
1826 let initial_state = vec![];
1827
1828 let request = assign!(create_room::v3::Request::new(), {
1829 invite: vec![user_id.to_owned()],
1830 is_direct: true,
1831 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1832 initial_state,
1833 });
1834
1835 self.create_room(request).await
1836 }
1837
1838 /// Get the first existing DM room with the given user, if any.
1839 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1840 self.get_dm_rooms(user_id).next()
1841 }
1842
1843 /// Get an iterator with the existing DM rooms for the given user.
1844 pub fn get_dm_rooms(&self, user_id: &UserId) -> impl Iterator<Item = Room> {
1845 let rooms = self.joined_rooms();
1846
1847 let dm_definition = &self.base_client().dm_room_definition;
1848
1849 // Find the room we share with the `user_id` and only with `user_id`
1850 let rooms = rooms.into_iter().filter(move |r| {
1851 let targets = r.direct_targets();
1852 let targets_match =
1853 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id));
1854 match dm_definition {
1855 DmRoomDefinition::MatrixSpec => targets_match,
1856 DmRoomDefinition::TwoMembers => {
1857 let service_members_count =
1858 r.service_members().map(|s| s.len()).unwrap_or_default() as u64;
1859 let active_non_service_members =
1860 r.active_members_count().saturating_sub(service_members_count);
1861 targets_match && active_non_service_members <= 2
1862 }
1863 }
1864 });
1865
1866 trace!(?user_id, ?rooms, "Found DM rooms with user");
1867 rooms
1868 }
1869
1870 /// Search the homeserver's directory for public rooms with a filter.
1871 ///
1872 /// # Arguments
1873 ///
1874 /// * `room_search` - The easiest way to create this request is using the
1875 /// `get_public_rooms_filtered::Request` itself.
1876 ///
1877 /// # Examples
1878 ///
1879 /// ```no_run
1880 /// # use url::Url;
1881 /// # use matrix_sdk::Client;
1882 /// # async {
1883 /// # let homeserver = Url::parse("http://example.com")?;
1884 /// use matrix_sdk::ruma::{
1885 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1886 /// };
1887 /// # let mut client = Client::new(homeserver).await?;
1888 ///
1889 /// let mut filter = Filter::new();
1890 /// filter.generic_search_term = Some("rust".to_owned());
1891 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1892 /// request.filter = filter;
1893 ///
1894 /// let response = client.public_rooms_filtered(request).await?;
1895 ///
1896 /// for room in response.chunk {
1897 /// println!("Found room {room:?}");
1898 /// }
1899 /// # anyhow::Ok(()) };
1900 /// ```
1901 pub async fn public_rooms_filtered(
1902 &self,
1903 request: get_public_rooms_filtered::v3::Request,
1904 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1905 self.send(request).await
1906 }
1907
1908 /// Send an arbitrary request to the server, without updating client state.
1909 ///
1910 /// **Warning:** Because this method *does not* update the client state, it
1911 /// is important to make sure that you account for this yourself, and
1912 /// use wrapper methods where available. This method should *only* be
1913 /// used if a wrapper method for the endpoint you'd like to use is not
1914 /// available.
1915 ///
1916 /// # Arguments
1917 ///
1918 /// * `request` - A filled out and valid request for the endpoint to be hit
1919 ///
1920 /// * `timeout` - An optional request timeout setting, this overrides the
1921 /// default request setting if one was set.
1922 ///
1923 /// # Examples
1924 ///
1925 /// ```no_run
1926 /// # use matrix_sdk::{Client, config::SyncSettings};
1927 /// # use url::Url;
1928 /// # async {
1929 /// # let homeserver = Url::parse("http://localhost:8080")?;
1930 /// # let mut client = Client::new(homeserver).await?;
1931 /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1932 ///
1933 /// // First construct the request you want to make
1934 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1935 /// // for all available Endpoints
1936 /// let user_id = owned_user_id!("@example:localhost");
1937 /// let request = profile::get_profile::v3::Request::new(user_id);
1938 ///
1939 /// // Start the request using Client::send()
1940 /// let response = client.send(request).await?;
1941 ///
1942 /// // Check the corresponding Response struct to find out what types are
1943 /// // returned
1944 /// # anyhow::Ok(()) };
1945 /// ```
1946 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1947 where
1948 Request: OutgoingRequest + Clone + Debug,
1949 Request::Authentication: SupportedAuthScheme,
1950 Request::PathBuilder: SupportedPathBuilder,
1951 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1952 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1953 {
1954 SendRequest {
1955 client: self.clone(),
1956 request,
1957 config: None,
1958 send_progress: Default::default(),
1959 }
1960 }
1961
1962 pub(crate) async fn send_inner<Request>(
1963 &self,
1964 request: Request,
1965 config: Option<RequestConfig>,
1966 send_progress: SharedObservable<TransmissionProgress>,
1967 ) -> HttpResult<Request::IncomingResponse>
1968 where
1969 Request: OutgoingRequest + Debug,
1970 Request::Authentication: SupportedAuthScheme,
1971 Request::PathBuilder: SupportedPathBuilder,
1972 for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1973 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1974 {
1975 let homeserver = self.homeserver().to_string();
1976 let access_token = self.access_token();
1977 let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1978
1979 let path_builder_input =
1980 Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1981
1982 let result = self
1983 .inner
1984 .http_client
1985 .send(
1986 request,
1987 config,
1988 homeserver,
1989 access_token.as_deref(),
1990 path_builder_input,
1991 send_progress,
1992 )
1993 .await;
1994
1995 if let Err(Some(ErrorKind::UnknownToken { .. })) =
1996 result.as_ref().map_err(HttpError::client_api_error_kind)
1997 && let Some(access_token) = &access_token
1998 {
1999 // Mark the access token as expired.
2000 self.auth_ctx().set_access_token_expired(access_token);
2001 }
2002
2003 result
2004 }
2005
2006 fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
2007 _ = self
2008 .inner
2009 .auth_ctx
2010 .session_change_sender
2011 .send(SessionChange::UnknownToken(unknown_token_data.clone()));
2012 }
2013
2014 /// Fetches server versions from network; no caching.
2015 pub async fn fetch_server_versions(
2016 &self,
2017 request_config: Option<RequestConfig>,
2018 ) -> HttpResult<get_supported_versions::Response> {
2019 // Since this was called by the user, try to refresh the access token if
2020 // necessary.
2021 self.fetch_server_versions_inner(false, request_config).await
2022 }
2023
2024 /// Fetches server versions from network; no caching.
2025 ///
2026 /// If the access token is expired and `failsafe` is `false`, this will
2027 /// attempt to refresh the access token, otherwise this will try to make an
2028 /// unauthenticated request instead.
2029 pub(crate) async fn fetch_server_versions_inner(
2030 &self,
2031 failsafe: bool,
2032 request_config: Option<RequestConfig>,
2033 ) -> HttpResult<get_supported_versions::Response> {
2034 if !failsafe {
2035 // `Client::send()` handles refreshing access tokens.
2036 return self
2037 .send(get_supported_versions::Request::new())
2038 .with_request_config(request_config)
2039 .await;
2040 }
2041
2042 let homeserver = self.homeserver().to_string();
2043
2044 // If we have a fresh access token, try with it first.
2045 if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2046 && self.auth_ctx().has_valid_access_token()
2047 && let Some(access_token) = self.access_token()
2048 {
2049 let result = self
2050 .inner
2051 .http_client
2052 .send(
2053 get_supported_versions::Request::new(),
2054 request_config,
2055 homeserver.clone(),
2056 Some(&access_token),
2057 (),
2058 Default::default(),
2059 )
2060 .await;
2061
2062 if let Err(Some(ErrorKind::UnknownToken { .. })) =
2063 result.as_ref().map_err(HttpError::client_api_error_kind)
2064 {
2065 // If the access token is actually expired, mark it as expired and fallback to
2066 // the unauthenticated request below.
2067 self.auth_ctx().set_access_token_expired(&access_token);
2068 } else {
2069 // If the request succeeded or it's an other error, just stop now.
2070 return result;
2071 }
2072 }
2073
2074 // Try without authentication.
2075 self.inner
2076 .http_client
2077 .send(
2078 get_supported_versions::Request::new(),
2079 request_config,
2080 homeserver.clone(),
2081 None,
2082 (),
2083 Default::default(),
2084 )
2085 .await
2086 }
2087
2088 /// Fetches client well_known from network; no caching.
2089 ///
2090 /// 1. If the [`Client::server`] value is available, we use it to fetch the
2091 /// well-known contents.
2092 /// 2. If it's not, we try extracting the server name from the
2093 /// [`Client::user_id`] and building the server URL from it.
2094 /// 3. If we couldn't get the well-known contents with either the explicit
2095 /// server name or the implicit extracted one, we try the homeserver URL
2096 /// as a last resort.
2097 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2098 let homeserver = self.homeserver();
2099 let scheme = homeserver.scheme();
2100
2101 // Use the server name, either an explicit one or an implicit one taken from
2102 // the user id: sometimes we'll have only the homeserver url available and no
2103 // server name, but the server name can be extracted from the current user id.
2104 let server_url = self
2105 .server()
2106 .map(|server| server.to_string())
2107 // If the server name wasn't available, extract it from the user id and build a URL:
2108 // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2109 // will be the same for the public server url, lacking a better candidate.
2110 .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2111
2112 // If the server name is available, first try using it
2113 let response = if let Some(server_url) = server_url {
2114 // First try using the server name
2115 self.fetch_client_well_known_with_url(server_url).await
2116 } else {
2117 None
2118 };
2119
2120 // If we didn't get a well-known value yet, try with the homeserver url instead:
2121 if response.is_none() {
2122 // Sometimes people configure their well-known directly on the homeserver so use
2123 // this as a fallback when the server name is unknown.
2124 warn!(
2125 "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2126 );
2127 self.fetch_client_well_known_with_url(homeserver.to_string()).await
2128 } else {
2129 response
2130 }
2131 }
2132
2133 async fn fetch_client_well_known_with_url(
2134 &self,
2135 url: String,
2136 ) -> Option<discover_homeserver::Response> {
2137 let well_known = self
2138 .inner
2139 .http_client
2140 .send(
2141 discover_homeserver::Request::new(),
2142 Some(RequestConfig::short_retry()),
2143 url,
2144 None,
2145 (),
2146 Default::default(),
2147 )
2148 .await;
2149
2150 match well_known {
2151 Ok(well_known) => Some(well_known),
2152 Err(http_error) => {
2153 // It is perfectly valid to not have a well-known file.
2154 // Maybe we should check for a specific error code to be sure?
2155 warn!("Failed to fetch client well-known: {http_error}");
2156 None
2157 }
2158 }
2159 }
2160
2161 /// Load supported versions from storage, or fetch them from network and
2162 /// cache them.
2163 ///
2164 /// If `failsafe` is true, this will try to minimize side effects to avoid
2165 /// possible deadlocks.
2166 async fn fetch_supported_versions(
2167 &self,
2168 failsafe: bool,
2169 ) -> HttpResult<SupportedVersionsResponse> {
2170 let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2171 let supported_versions = SupportedVersionsResponse {
2172 versions: server_versions.versions,
2173 unstable_features: server_versions.unstable_features,
2174 };
2175
2176 Ok(supported_versions)
2177 }
2178
2179 /// Get the Matrix versions and features supported by the homeserver by
2180 /// fetching them from the server or the cache.
2181 ///
2182 /// This is equivalent to calling both [`Client::server_versions()`] and
2183 /// [`Client::unstable_features()`]. To always fetch the result from the
2184 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2185 /// and then `.as_supported_versions()` on the response.
2186 ///
2187 /// # Examples
2188 ///
2189 /// ```no_run
2190 /// use ruma::api::{FeatureFlag, MatrixVersion};
2191 /// # use matrix_sdk::{Client, config::SyncSettings};
2192 /// # use url::Url;
2193 /// # async {
2194 /// # let homeserver = Url::parse("http://localhost:8080")?;
2195 /// # let mut client = Client::new(homeserver).await?;
2196 ///
2197 /// let supported = client.supported_versions().await?;
2198 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2199 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2200 ///
2201 /// let msc_x_feature = FeatureFlag::from("msc_x");
2202 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2203 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2204 /// # anyhow::Ok(()) };
2205 /// ```
2206 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2207 self.supported_versions_inner(false).await
2208 }
2209
2210 /// Get the Matrix versions and features supported by the homeserver by
2211 /// fetching them from the server or the cache.
2212 ///
2213 /// If `failsafe` is true, this will try to minimize side effects to avoid
2214 /// possible deadlocks.
2215 pub(crate) async fn supported_versions_inner(
2216 &self,
2217 failsafe: bool,
2218 ) -> HttpResult<SupportedVersions> {
2219 match self.supported_versions_cached_inner(failsafe).await {
2220 Ok(Some(value)) => {
2221 return Ok(value);
2222 }
2223 Ok(None) => {
2224 // The cache is empty, make a request.
2225 }
2226 Err(error) => {
2227 warn!("error when loading cached supported versions: {error}");
2228 // Fallthrough to make a request.
2229 }
2230 }
2231
2232 self.refresh_supported_versions_cache(failsafe).await
2233 }
2234
2235 /// Refresh the Matrix versions and features supported by the homeserver in
2236 /// the cache.
2237 ///
2238 /// If `failsafe` is true, this will try to minimize side effects to avoid
2239 /// possible deadlocks.
2240 async fn refresh_supported_versions_cache(
2241 &self,
2242 failsafe: bool,
2243 ) -> HttpResult<SupportedVersions> {
2244 let cached_supported_versions = &self.inner.caches.supported_versions;
2245
2246 let mut supported_versions_guard = match cached_supported_versions.refresh_lock.try_lock() {
2247 Ok(guard) => guard,
2248 Err(_) => {
2249 // There is already a refresh in progress, wait for it to finish.
2250 let guard = cached_supported_versions.refresh_lock.lock().await;
2251
2252 if let Err(error) = guard.as_ref() {
2253 // There was an error in the previous refresh, return it.
2254 return Err(HttpError::Cached(error.clone()));
2255 }
2256
2257 // Reuse the data if it was cached and it hasn't expired.
2258 if let CachedValue::Cached(value) = cached_supported_versions.value()
2259 && !value.has_expired()
2260 {
2261 return Ok(value.into_data());
2262 }
2263
2264 // The data wasn't cached or has expired, we need to make another request.
2265 guard
2266 }
2267 };
2268
2269 let response = match self.fetch_supported_versions(failsafe).await {
2270 Ok(response) => {
2271 *supported_versions_guard = Ok(());
2272 TtlValue::new(response)
2273 }
2274 Err(error) => {
2275 let error = Arc::new(error);
2276 *supported_versions_guard = Err(error.clone());
2277 return Err(HttpError::Cached(error));
2278 }
2279 };
2280
2281 let supported_versions = response.as_ref().map(|response| response.supported_versions());
2282
2283 // Only cache the result if the request was authenticated.
2284 if self.auth_ctx().has_valid_access_token() {
2285 if let Err(err) = self
2286 .state_store()
2287 .set_kv_data(
2288 StateStoreDataKey::SupportedVersions,
2289 StateStoreDataValue::SupportedVersions(response),
2290 )
2291 .await
2292 {
2293 warn!("error when caching supported versions: {err}");
2294 }
2295
2296 cached_supported_versions.set_value(supported_versions.clone());
2297 }
2298
2299 Ok(supported_versions.into_data())
2300 }
2301
2302 /// Get the Matrix versions and features supported by the homeserver by
2303 /// fetching them from the cache.
2304 ///
2305 /// For a version of this function that fetches the supported versions and
2306 /// features from the homeserver if the [`SupportedVersions`] aren't
2307 /// found in the cache, take a look at the [`Client::supported_versions()`]
2308 /// method.
2309 ///
2310 /// If the data in the cache has expired, this will trigger a background
2311 /// task to refresh it.
2312 ///
2313 /// # Examples
2314 ///
2315 /// ```no_run
2316 /// use ruma::api::{FeatureFlag, MatrixVersion};
2317 /// # use matrix_sdk::{Client, config::SyncSettings};
2318 /// # use url::Url;
2319 /// # async {
2320 /// # let homeserver = Url::parse("http://localhost:8080")?;
2321 /// # let mut client = Client::new(homeserver).await?;
2322 ///
2323 /// let supported =
2324 /// if let Some(supported) = client.supported_versions_cached().await? {
2325 /// supported
2326 /// } else {
2327 /// client.fetch_server_versions(None).await?.as_supported_versions()
2328 /// };
2329 ///
2330 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2331 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2332 ///
2333 /// let msc_x_feature = FeatureFlag::from("msc_x");
2334 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2335 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2336 /// # anyhow::Ok(()) };
2337 /// ```
2338 pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2339 self.supported_versions_cached_inner(false).await
2340 }
2341
2342 async fn supported_versions_cached_inner(
2343 &self,
2344 failsafe: bool,
2345 ) -> Result<Option<SupportedVersions>, StoreError> {
2346 let supported_versions_cache = &self.inner.caches.supported_versions;
2347
2348 let value = if let CachedValue::Cached(cached) = supported_versions_cache.value() {
2349 cached
2350 } else if let Some(stored) = self
2351 .state_store()
2352 .get_kv_data(StateStoreDataKey::SupportedVersions)
2353 .await?
2354 .and_then(|value| value.into_supported_versions())
2355 {
2356 let stored = stored.map(|response| response.supported_versions());
2357
2358 // Copy the data from the store in the in-memory cache.
2359 supported_versions_cache.set_value(stored.clone());
2360
2361 stored
2362 } else {
2363 return Ok(None);
2364 };
2365
2366 // Spawn a task to refresh the cache if it has expired and we have a valid
2367 // access token.
2368 if value.has_expired() && self.auth_ctx().has_valid_access_token() {
2369 debug!("spawning task to refresh supported versions cache");
2370
2371 let client = self.clone();
2372 self.task_monitor().spawn_finite_task("refresh supported versions cache", async move {
2373 if let Err(error) = client.refresh_supported_versions_cache(failsafe).await {
2374 warn!("failed to refresh supported versions cache: {error}");
2375 }
2376 });
2377 }
2378
2379 Ok(Some(value.into_data()))
2380 }
2381
2382 /// Get the Matrix versions supported by the homeserver by fetching them
2383 /// from the server or the cache.
2384 ///
2385 /// # Examples
2386 ///
2387 /// ```no_run
2388 /// use ruma::api::MatrixVersion;
2389 /// # use matrix_sdk::{Client, config::SyncSettings};
2390 /// # use url::Url;
2391 /// # async {
2392 /// # let homeserver = Url::parse("http://localhost:8080")?;
2393 /// # let mut client = Client::new(homeserver).await?;
2394 ///
2395 /// let server_versions = client.server_versions().await?;
2396 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2397 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2398 /// # anyhow::Ok(()) };
2399 /// ```
2400 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2401 Ok(self.supported_versions().await?.versions)
2402 }
2403
2404 /// Get the unstable features supported by the homeserver by fetching them
2405 /// from the server or the cache.
2406 ///
2407 /// # Examples
2408 ///
2409 /// ```no_run
2410 /// use matrix_sdk::ruma::api::FeatureFlag;
2411 /// # use matrix_sdk::{Client, config::SyncSettings};
2412 /// # use url::Url;
2413 /// # async {
2414 /// # let homeserver = Url::parse("http://localhost:8080")?;
2415 /// # let mut client = Client::new(homeserver).await?;
2416 ///
2417 /// let msc_x_feature = FeatureFlag::from("msc_x");
2418 /// let unstable_features = client.unstable_features().await?;
2419 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2420 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2421 /// # anyhow::Ok(()) };
2422 /// ```
2423 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2424 Ok(self.supported_versions().await?.features)
2425 }
2426
2427 /// Empty the supported versions and unstable features cache.
2428 ///
2429 /// Since the SDK caches the supported versions, it's possible to have a
2430 /// stale entry in the cache. This functions makes it possible to force
2431 /// reset it.
2432 pub async fn reset_supported_versions(&self) -> Result<()> {
2433 // Empty the in-memory cache.
2434 self.inner.caches.supported_versions.reset();
2435
2436 // Empty the store cache.
2437 Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2438 }
2439
2440 /// Get the well-known file of the homeserver from the cache.
2441 ///
2442 /// If the data in the cache has expired, this will trigger a background
2443 /// task to refresh it.
2444 async fn well_known_cached(
2445 &self,
2446 ) -> Result<CachedValue<Option<WellKnownResponse>>, StoreError> {
2447 let well_known_cache = &self.inner.caches.well_known;
2448
2449 let value = if let CachedValue::Cached(cached) = well_known_cache.value() {
2450 cached
2451 } else if let Some(stored) = self
2452 .state_store()
2453 .get_kv_data(StateStoreDataKey::WellKnown)
2454 .await?
2455 .and_then(|value| value.into_well_known())
2456 {
2457 // Copy the data from the store into the in-memory cache.
2458 well_known_cache.set_value(stored.clone());
2459
2460 stored
2461 } else {
2462 return Ok(CachedValue::NotSet);
2463 };
2464
2465 // Spawn a task to refresh the cache if it has expired.
2466 if value.has_expired() {
2467 debug!("spawning task to refresh well-known cache");
2468
2469 let client = self.clone();
2470 self.task_monitor().spawn_finite_task("refresh well-known cache", async move {
2471 client.refresh_well_known_cache().await;
2472 });
2473 }
2474
2475 Ok(CachedValue::Cached(value.into_data()))
2476 }
2477
2478 /// Refresh the well-known file of the homeserver in the cache.
2479 async fn refresh_well_known_cache(&self) -> Option<WellKnownResponse> {
2480 let well_known_cache = &self.inner.caches.well_known;
2481
2482 let _well_known_guard = match well_known_cache.refresh_lock.try_lock() {
2483 Ok(guard) => guard,
2484 Err(_) => {
2485 // There is already a refresh in progress, wait for it to finish.
2486 let guard = well_known_cache.refresh_lock.lock().await;
2487
2488 // A refresh can't fail because we ignore failures, so there shouldn't be an
2489 // error in the refresh lock.
2490
2491 // Reuse the data if it was cached and it hasn't expired.
2492 if let CachedValue::Cached(value) = well_known_cache.value()
2493 && !value.has_expired()
2494 {
2495 return value.into_data();
2496 }
2497
2498 // The data wasn't cached or has expired, we need to make another request.
2499 guard
2500 }
2501 };
2502
2503 let well_known = TtlValue::new(self.fetch_client_well_known().await.map(Into::into));
2504
2505 if let Err(err) = self
2506 .state_store()
2507 .set_kv_data(
2508 StateStoreDataKey::WellKnown,
2509 StateStoreDataValue::WellKnown(well_known.clone()),
2510 )
2511 .await
2512 {
2513 warn!("error when caching well-known: {err}");
2514 }
2515
2516 well_known_cache.set_value(well_known.clone());
2517
2518 well_known.into_data()
2519 }
2520
2521 /// Get the well-known file of the homeserver by fetching it from the server
2522 /// or the cache.
2523 async fn well_known(&self) -> Option<WellKnownResponse> {
2524 match self.well_known_cached().await {
2525 Ok(CachedValue::Cached(value)) => {
2526 return value;
2527 }
2528 Ok(CachedValue::NotSet) => {
2529 // The cache is empty, make a request.
2530 }
2531 Err(error) => {
2532 warn!("error when loading cached well-known: {error}");
2533 // Fallthrough to make a request.
2534 }
2535 }
2536
2537 self.refresh_well_known_cache().await
2538 }
2539
2540 /// Get information about the homeserver's advertised RTC foci by fetching
2541 /// the well-known file from the server or the cache.
2542 ///
2543 /// # Examples
2544 /// ```no_run
2545 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2546 /// # use url::Url;
2547 /// # async {
2548 /// # let homeserver = Url::parse("http://localhost:8080")?;
2549 /// # let mut client = Client::new(homeserver).await?;
2550 /// let rtc_foci = client.rtc_foci().await?;
2551 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2552 /// RtcFocusInfo::LiveKit(info) => Some(info),
2553 /// _ => None,
2554 /// });
2555 /// if let Some(info) = default_livekit_focus_info {
2556 /// println!("Default LiveKit service URL: {}", info.service_url);
2557 /// }
2558 /// # anyhow::Ok(()) };
2559 /// ```
2560 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2561 let well_known = self.well_known().await;
2562
2563 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2564 }
2565
2566 /// Empty the well-known cache.
2567 ///
2568 /// Since the SDK caches the well-known, it's possible to have a stale entry
2569 /// in the cache. This functions makes it possible to force reset it.
2570 pub async fn reset_well_known(&self) -> Result<()> {
2571 // Empty the in-memory caches.
2572 self.inner.caches.well_known.reset();
2573
2574 // Empty the store cache.
2575 Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2576 }
2577
2578 /// Check whether MSC 4028 is enabled on the homeserver.
2579 ///
2580 /// # Examples
2581 ///
2582 /// ```no_run
2583 /// # use matrix_sdk::{Client, config::SyncSettings};
2584 /// # use url::Url;
2585 /// # async {
2586 /// # let homeserver = Url::parse("http://localhost:8080")?;
2587 /// # let mut client = Client::new(homeserver).await?;
2588 /// let msc4028_enabled =
2589 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2590 /// # anyhow::Ok(()) };
2591 /// ```
2592 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2593 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2594 }
2595
2596 /// Get information of all our own devices.
2597 ///
2598 /// # Examples
2599 ///
2600 /// ```no_run
2601 /// # use matrix_sdk::{Client, config::SyncSettings};
2602 /// # use url::Url;
2603 /// # async {
2604 /// # let homeserver = Url::parse("http://localhost:8080")?;
2605 /// # let mut client = Client::new(homeserver).await?;
2606 /// let response = client.devices().await?;
2607 ///
2608 /// for device in response.devices {
2609 /// println!(
2610 /// "Device: {} {}",
2611 /// device.device_id,
2612 /// device.display_name.as_deref().unwrap_or("")
2613 /// );
2614 /// }
2615 /// # anyhow::Ok(()) };
2616 /// ```
2617 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2618 let request = get_devices::v3::Request::new();
2619
2620 self.send(request).await
2621 }
2622
2623 /// Delete the given devices from the server.
2624 ///
2625 /// # Arguments
2626 ///
2627 /// * `devices` - The list of devices that should be deleted from the
2628 /// server.
2629 ///
2630 /// * `auth_data` - This request requires user interactive auth, the first
2631 /// request needs to set this to `None` and will always fail with an
2632 /// `UiaaResponse`. The response will contain information for the
2633 /// interactive auth and the same request needs to be made but this time
2634 /// with some `auth_data` provided.
2635 ///
2636 /// ```no_run
2637 /// # use matrix_sdk::{
2638 /// # ruma::{api::client::uiaa, owned_device_id},
2639 /// # Client, Error, config::SyncSettings,
2640 /// # };
2641 /// # use serde_json::json;
2642 /// # use url::Url;
2643 /// # use std::collections::BTreeMap;
2644 /// # async {
2645 /// # let homeserver = Url::parse("http://localhost:8080")?;
2646 /// # let mut client = Client::new(homeserver).await?;
2647 /// let devices = &[owned_device_id!("DEVICEID")];
2648 ///
2649 /// if let Err(e) = client.delete_devices(devices, None).await {
2650 /// if let Some(info) = e.as_uiaa_response() {
2651 /// let mut password = uiaa::Password::new(
2652 /// uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2653 /// "wordpass".to_owned(),
2654 /// );
2655 /// password.session = info.session.clone();
2656 ///
2657 /// client
2658 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2659 /// .await?;
2660 /// }
2661 /// }
2662 /// # anyhow::Ok(()) };
2663 pub async fn delete_devices(
2664 &self,
2665 devices: &[OwnedDeviceId],
2666 auth_data: Option<uiaa::AuthData>,
2667 ) -> HttpResult<delete_devices::v3::Response> {
2668 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2669 request.auth = auth_data;
2670
2671 self.send(request).await
2672 }
2673
2674 /// Change the display name of a device owned by the current user.
2675 ///
2676 /// Returns a `update_device::Response` which specifies the result
2677 /// of the operation.
2678 ///
2679 /// # Arguments
2680 ///
2681 /// * `device_id` - The ID of the device to change the display name of.
2682 /// * `display_name` - The new display name to set.
2683 pub async fn rename_device(
2684 &self,
2685 device_id: &DeviceId,
2686 display_name: &str,
2687 ) -> HttpResult<update_device::v3::Response> {
2688 let mut request = update_device::v3::Request::new(device_id.to_owned());
2689 request.display_name = Some(display_name.to_owned());
2690
2691 self.send(request).await
2692 }
2693
2694 /// Check whether a device with a specific ID exists on the server.
2695 ///
2696 /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2697 /// with 404 and the underlying error otherwise.
2698 ///
2699 /// # Arguments
2700 ///
2701 /// * `device_id` - The ID of the device to query.
2702 pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2703 let request = device::get_device::v3::Request::new(device_id);
2704 match self.send(request).await {
2705 Ok(_) => Ok(true),
2706 Err(err) => {
2707 if let Some(error) = err.as_client_api_error()
2708 && error.status_code == 404
2709 {
2710 Ok(false)
2711 } else {
2712 Err(err.into())
2713 }
2714 }
2715 }
2716 }
2717
2718 /// Synchronize the client's state with the latest state on the server.
2719 ///
2720 /// ## Syncing Events
2721 ///
2722 /// Messages or any other type of event need to be periodically fetched from
2723 /// the server, this is achieved by sending a `/sync` request to the server.
2724 ///
2725 /// The first sync is sent out without a [`token`]. The response of the
2726 /// first sync will contain a [`next_batch`] field which should then be
2727 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2728 /// don't receive the same events multiple times.
2729 ///
2730 /// ## Long Polling
2731 ///
2732 /// A sync should in the usual case always be in flight. The
2733 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2734 /// long the server will wait for new events before it will respond.
2735 /// The server will respond immediately if some new events arrive before the
2736 /// timeout has expired. If no changes arrive and the timeout expires an
2737 /// empty sync response will be sent to the client.
2738 ///
2739 /// This method of sending a request that may not receive a response
2740 /// immediately is called long polling.
2741 ///
2742 /// ## Filtering Events
2743 ///
2744 /// The number or type of messages and events that the client should receive
2745 /// from the server can be altered using a [`Filter`].
2746 ///
2747 /// Filters can be non-trivial and, since they will be sent with every sync
2748 /// request, they may take up a bunch of unnecessary bandwidth.
2749 ///
2750 /// Luckily filters can be uploaded to the server and reused using an unique
2751 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2752 /// method.
2753 ///
2754 /// # Arguments
2755 ///
2756 /// * `sync_settings` - Settings for the sync call, this allows us to set
2757 /// various options to configure the sync:
2758 /// * [`filter`] - To configure which events we receive and which get
2759 /// [filtered] by the server
2760 /// * [`timeout`] - To configure our [long polling] setup.
2761 /// * [`token`] - To tell the server which events we already received
2762 /// and where we wish to continue syncing.
2763 /// * [`full_state`] - To tell the server that we wish to receive all
2764 /// state events, regardless of our configured [`token`].
2765 /// * [`set_presence`] - To tell the server to set the presence and to
2766 /// which state.
2767 ///
2768 /// # Examples
2769 ///
2770 /// ```no_run
2771 /// # use url::Url;
2772 /// # async {
2773 /// # let homeserver = Url::parse("http://localhost:8080")?;
2774 /// # let username = "";
2775 /// # let password = "";
2776 /// use matrix_sdk::{
2777 /// Client, config::SyncSettings,
2778 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2779 /// };
2780 ///
2781 /// let client = Client::new(homeserver).await?;
2782 /// client.matrix_auth().login_username(username, password).send().await?;
2783 ///
2784 /// // Sync once so we receive the client state and old messages.
2785 /// client.sync_once(SyncSettings::default()).await?;
2786 ///
2787 /// // Register our handler so we start responding once we receive a new
2788 /// // event.
2789 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2790 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2791 /// });
2792 ///
2793 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2794 /// // from our `sync_once()` call automatically.
2795 /// client.sync(SyncSettings::default()).await;
2796 /// # anyhow::Ok(()) };
2797 /// ```
2798 ///
2799 /// [`sync`]: #method.sync
2800 /// [`SyncSettings`]: crate::config::SyncSettings
2801 /// [`token`]: crate::config::SyncSettings#method.token
2802 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2803 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2804 /// [`set_presence`]: ruma::presence::PresenceState
2805 /// [`filter`]: crate::config::SyncSettings#method.filter
2806 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2807 /// [`next_batch`]: SyncResponse#structfield.next_batch
2808 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2809 /// [long polling]: #long-polling
2810 /// [filtered]: #filtering-events
2811 #[instrument(skip(self))]
2812 pub async fn sync_once(
2813 &self,
2814 sync_settings: crate::config::SyncSettings,
2815 ) -> Result<SyncResponse> {
2816 // The sync might not return for quite a while due to the timeout.
2817 // We'll see if there's anything crypto related to send out before we
2818 // sync, i.e. if we closed our client after a sync but before the
2819 // crypto requests were sent out.
2820 //
2821 // This will mostly be a no-op.
2822 #[cfg(feature = "e2e-encryption")]
2823 if let Err(e) = self.send_outgoing_requests().await {
2824 error!(error = ?e, "Error while sending outgoing E2EE requests");
2825 }
2826
2827 let token = match sync_settings.token {
2828 SyncToken::Specific(token) => Some(token),
2829 SyncToken::NoToken => None,
2830 SyncToken::ReusePrevious => self.sync_token().await,
2831 };
2832
2833 let request = assign!(sync_events::v3::Request::new(), {
2834 filter: sync_settings.filter.map(|f| *f),
2835 since: token,
2836 full_state: sync_settings.full_state,
2837 set_presence: sync_settings.set_presence,
2838 timeout: sync_settings.timeout,
2839 use_state_after: true,
2840 });
2841 let mut request_config = self.request_config();
2842 if let Some(timeout) = sync_settings.timeout {
2843 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2844 request_config.timeout = Some(base_timeout + timeout);
2845 }
2846
2847 let response = self.send(request).with_request_config(request_config).await?;
2848 let next_batch = response.next_batch.clone();
2849 let response = self.process_sync(response).await?;
2850
2851 #[cfg(feature = "e2e-encryption")]
2852 if let Err(e) = self.send_outgoing_requests().await {
2853 error!(error = ?e, "Error while sending outgoing E2EE requests");
2854 }
2855
2856 self.inner.sync_beat.notify(usize::MAX);
2857
2858 Ok(SyncResponse::new(next_batch, response))
2859 }
2860
2861 /// Repeatedly synchronize the client state with the server.
2862 ///
2863 /// This method will only return on error, if cancellation is needed
2864 /// the method should be wrapped in a cancelable task or the
2865 /// [`Client::sync_with_callback`] method can be used or
2866 /// [`Client::sync_with_result_callback`] if you want to handle error
2867 /// cases in the loop, too.
2868 ///
2869 /// This method will internally call [`Client::sync_once`] in a loop.
2870 ///
2871 /// This method can be used with the [`Client::add_event_handler`]
2872 /// method to react to individual events. If you instead wish to handle
2873 /// events in a bulk manner the [`Client::sync_with_callback`],
2874 /// [`Client::sync_with_result_callback`] and
2875 /// [`Client::sync_stream`] methods can be used instead. Those methods
2876 /// repeatedly return the whole sync response.
2877 ///
2878 /// # Arguments
2879 ///
2880 /// * `sync_settings` - Settings for the sync call. *Note* that those
2881 /// settings will be only used for the first sync call. See the argument
2882 /// docs for [`Client::sync_once`] for more info.
2883 ///
2884 /// # Return
2885 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2886 /// up to the user of the API to check the error and decide whether the sync
2887 /// should continue or not.
2888 ///
2889 /// # Examples
2890 ///
2891 /// ```no_run
2892 /// # use url::Url;
2893 /// # async {
2894 /// # let homeserver = Url::parse("http://localhost:8080")?;
2895 /// # let username = "";
2896 /// # let password = "";
2897 /// use matrix_sdk::{
2898 /// Client, config::SyncSettings,
2899 /// ruma::events::room::message::OriginalSyncRoomMessageEvent,
2900 /// };
2901 ///
2902 /// let client = Client::new(homeserver).await?;
2903 /// client.matrix_auth().login_username(&username, &password).send().await?;
2904 ///
2905 /// // Register our handler so we start responding once we receive a new
2906 /// // event.
2907 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2908 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2909 /// });
2910 ///
2911 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2912 /// // automatically.
2913 /// client.sync(SyncSettings::default()).await?;
2914 /// # anyhow::Ok(()) };
2915 /// ```
2916 ///
2917 /// [argument docs]: #method.sync_once
2918 /// [`sync_with_callback`]: #method.sync_with_callback
2919 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2920 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2921 }
2922
2923 /// Repeatedly call sync to synchronize the client state with the server.
2924 ///
2925 /// # Arguments
2926 ///
2927 /// * `sync_settings` - Settings for the sync call. *Note* that those
2928 /// settings will be only used for the first sync call. See the argument
2929 /// docs for [`Client::sync_once`] for more info.
2930 ///
2931 /// * `callback` - A callback that will be called every time a successful
2932 /// response has been fetched from the server. The callback must return a
2933 /// boolean which signalizes if the method should stop syncing. If the
2934 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2935 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2936 ///
2937 /// # Return
2938 /// The sync runs until an error occurs or the
2939 /// callback indicates that the Loop should stop. If the callback asked for
2940 /// a regular stop, the result will be `Ok(())` otherwise the
2941 /// `Err(Error)` is returned.
2942 ///
2943 /// # Examples
2944 ///
2945 /// The following example demonstrates how to sync forever while sending all
2946 /// the interesting events through a mpsc channel to another thread e.g. a
2947 /// UI thread.
2948 ///
2949 /// ```no_run
2950 /// # use std::time::Duration;
2951 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2952 /// # use url::Url;
2953 /// # async {
2954 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2955 /// # let mut client = Client::new(homeserver).await.unwrap();
2956 ///
2957 /// use tokio::sync::mpsc::channel;
2958 ///
2959 /// let (tx, rx) = channel(100);
2960 ///
2961 /// let sync_channel = &tx;
2962 /// let sync_settings = SyncSettings::new()
2963 /// .timeout(Duration::from_secs(30));
2964 ///
2965 /// client
2966 /// .sync_with_callback(sync_settings, |response| async move {
2967 /// let channel = sync_channel;
2968 /// for (room_id, room) in response.rooms.joined {
2969 /// for event in room.timeline.events {
2970 /// channel.send(event).await.unwrap();
2971 /// }
2972 /// }
2973 ///
2974 /// LoopCtrl::Continue
2975 /// })
2976 /// .await;
2977 /// };
2978 /// ```
2979 #[instrument(skip_all)]
2980 pub async fn sync_with_callback<C>(
2981 &self,
2982 sync_settings: crate::config::SyncSettings,
2983 callback: impl Fn(SyncResponse) -> C,
2984 ) -> Result<(), Error>
2985 where
2986 C: Future<Output = LoopCtrl>,
2987 {
2988 self.sync_with_result_callback(sync_settings, |result| async {
2989 Ok(callback(result?).await)
2990 })
2991 .await
2992 }
2993
2994 /// Repeatedly call sync to synchronize the client state with the server.
2995 ///
2996 /// # Arguments
2997 ///
2998 /// * `sync_settings` - Settings for the sync call. *Note* that those
2999 /// settings will be only used for the first sync call. See the argument
3000 /// docs for [`Client::sync_once`] for more info.
3001 ///
3002 /// * `callback` - A callback that will be called every time after a
3003 /// response has been received, failure or not. The callback returns a
3004 /// `Result<LoopCtrl, Error>`, too. When returning
3005 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
3006 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
3007 /// function returns `Ok(())`. In case the callback can't handle the
3008 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
3009 /// which results in the sync ending and the `Err(Error)` being returned.
3010 ///
3011 /// # Return
3012 /// The sync runs until an error occurs that the callback can't handle or
3013 /// the callback indicates that the Loop should stop. If the callback
3014 /// asked for a regular stop, the result will be `Ok(())` otherwise the
3015 /// `Err(Error)` is returned.
3016 ///
3017 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
3018 /// this, and are handled first without sending the result to the
3019 /// callback. Only after they have exceeded is the `Result` handed to
3020 /// the callback.
3021 ///
3022 /// # Examples
3023 ///
3024 /// The following example demonstrates how to sync forever while sending all
3025 /// the interesting events through a mpsc channel to another thread e.g. a
3026 /// UI thread.
3027 ///
3028 /// ```no_run
3029 /// # use std::time::Duration;
3030 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
3031 /// # use url::Url;
3032 /// # async {
3033 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
3034 /// # let mut client = Client::new(homeserver).await.unwrap();
3035 /// #
3036 /// use tokio::sync::mpsc::channel;
3037 ///
3038 /// let (tx, rx) = channel(100);
3039 ///
3040 /// let sync_channel = &tx;
3041 /// let sync_settings = SyncSettings::new()
3042 /// .timeout(Duration::from_secs(30));
3043 ///
3044 /// client
3045 /// .sync_with_result_callback(sync_settings, |response| async move {
3046 /// let channel = sync_channel;
3047 /// let sync_response = response?;
3048 /// for (room_id, room) in sync_response.rooms.joined {
3049 /// for event in room.timeline.events {
3050 /// channel.send(event).await.unwrap();
3051 /// }
3052 /// }
3053 ///
3054 /// Ok(LoopCtrl::Continue)
3055 /// })
3056 /// .await;
3057 /// };
3058 /// ```
3059 #[instrument(skip(self, callback))]
3060 pub async fn sync_with_result_callback<C>(
3061 &self,
3062 sync_settings: crate::config::SyncSettings,
3063 callback: impl Fn(Result<SyncResponse, Error>) -> C,
3064 ) -> Result<(), Error>
3065 where
3066 C: Future<Output = Result<LoopCtrl, Error>>,
3067 {
3068 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
3069
3070 while let Some(result) = sync_stream.next().await {
3071 trace!("Running callback");
3072 if callback(result).await? == LoopCtrl::Break {
3073 trace!("Callback told us to stop");
3074 break;
3075 }
3076 trace!("Done running callback");
3077 }
3078
3079 Ok(())
3080 }
3081
3082 //// Repeatedly synchronize the client state with the server.
3083 ///
3084 /// This method will internally call [`Client::sync_once`] in a loop and is
3085 /// equivalent to the [`Client::sync`] method but the responses are provided
3086 /// as an async stream.
3087 ///
3088 /// # Arguments
3089 ///
3090 /// * `sync_settings` - Settings for the sync call. *Note* that those
3091 /// settings will be only used for the first sync call. See the argument
3092 /// docs for [`Client::sync_once`] for more info.
3093 ///
3094 /// # Examples
3095 ///
3096 /// ```no_run
3097 /// # use url::Url;
3098 /// # async {
3099 /// # let homeserver = Url::parse("http://localhost:8080")?;
3100 /// # let username = "";
3101 /// # let password = "";
3102 /// use futures_util::StreamExt;
3103 /// use matrix_sdk::{Client, config::SyncSettings};
3104 ///
3105 /// let client = Client::new(homeserver).await?;
3106 /// client.matrix_auth().login_username(&username, &password).send().await?;
3107 ///
3108 /// let mut sync_stream =
3109 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
3110 ///
3111 /// while let Some(Ok(response)) = sync_stream.next().await {
3112 /// for room in response.rooms.joined.values() {
3113 /// for e in &room.timeline.events {
3114 /// if let Ok(event) = e.raw().deserialize() {
3115 /// println!("Received event {:?}", event);
3116 /// }
3117 /// }
3118 /// }
3119 /// }
3120 ///
3121 /// # anyhow::Ok(()) };
3122 /// ```
3123 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3124 #[instrument(skip(self))]
3125 pub async fn sync_stream(
3126 &self,
3127 mut sync_settings: crate::config::SyncSettings,
3128 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3129 let mut is_first_sync = true;
3130 let mut timeout = None;
3131 let mut last_sync_time: Option<Instant> = None;
3132
3133 let parent_span = Span::current();
3134
3135 async_stream::stream!({
3136 loop {
3137 trace!("Syncing");
3138
3139 if sync_settings.ignore_timeout_on_first_sync {
3140 if is_first_sync {
3141 timeout = sync_settings.timeout.take();
3142 } else if sync_settings.timeout.is_none() && timeout.is_some() {
3143 sync_settings.timeout = timeout.take();
3144 }
3145
3146 is_first_sync = false;
3147 }
3148
3149 yield self
3150 .sync_loop_helper(&mut sync_settings)
3151 .instrument(parent_span.clone())
3152 .await;
3153
3154 Client::delay_sync(&mut last_sync_time).await
3155 }
3156 })
3157 }
3158
3159 /// Get the current, if any, sync token of the client.
3160 /// This will be None if the client didn't sync at least once.
3161 pub(crate) async fn sync_token(&self) -> Option<String> {
3162 self.inner.base_client.sync_token().await
3163 }
3164
3165 /// Gets information about the owner of a given access token.
3166 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3167 let request = whoami::v3::Request::new();
3168 self.send(request).await
3169 }
3170
3171 /// Subscribes a new receiver to client SessionChange broadcasts.
3172 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3173 let broadcast = &self.auth_ctx().session_change_sender;
3174 broadcast.subscribe()
3175 }
3176
3177 /// Sets the save/restore session callbacks.
3178 ///
3179 /// This is another mechanism to get synchronous updates to session tokens,
3180 /// while [`Self::subscribe_to_session_changes`] provides an async update.
3181 pub fn set_session_callbacks(
3182 &self,
3183 reload_session_callback: Box<ReloadSessionCallback>,
3184 save_session_callback: Box<SaveSessionCallback>,
3185 ) -> Result<()> {
3186 self.inner
3187 .auth_ctx
3188 .reload_session_callback
3189 .set(reload_session_callback)
3190 .map_err(|_| Error::MultipleSessionCallbacks)?;
3191
3192 self.inner
3193 .auth_ctx
3194 .save_session_callback
3195 .set(save_session_callback)
3196 .map_err(|_| Error::MultipleSessionCallbacks)?;
3197
3198 Ok(())
3199 }
3200
3201 /// Get the notification settings of the current owner of the client.
3202 pub async fn notification_settings(&self) -> NotificationSettings {
3203 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3204 NotificationSettings::new(self.clone(), ruleset)
3205 }
3206
3207 /// Create a new specialized `Client` that can process notifications.
3208 ///
3209 /// See [`CrossProcessLock::new`] to learn more about
3210 /// `cross_process_lock_config`.
3211 ///
3212 /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3213 pub async fn notification_client(
3214 &self,
3215 cross_process_lock_config: CrossProcessLockConfig,
3216 ) -> Result<Client> {
3217 let client = Client {
3218 inner: ClientInner::new(
3219 self.inner.auth_ctx.clone(),
3220 self.server().cloned(),
3221 self.homeserver(),
3222 self.sliding_sync_version(),
3223 self.inner.http_client.clone(),
3224 self.inner
3225 .base_client
3226 .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3227 .await?,
3228 self.inner.caches.supported_versions.value(),
3229 self.inner.caches.well_known.value(),
3230 self.inner.respect_login_well_known,
3231 self.inner.event_cache.clone(),
3232 self.inner.send_queue_data.clone(),
3233 self.inner.latest_events.clone(),
3234 #[cfg(feature = "e2e-encryption")]
3235 self.inner.e2ee.encryption_settings,
3236 #[cfg(feature = "e2e-encryption")]
3237 self.inner.enable_share_history_on_invite,
3238 cross_process_lock_config,
3239 #[cfg(feature = "experimental-search")]
3240 self.inner.search_index.clone(),
3241 self.inner.thread_subscription_catchup.clone(),
3242 )
3243 .await,
3244 };
3245
3246 Ok(client)
3247 }
3248
3249 /// The [`EventCache`] instance for this [`Client`].
3250 pub fn event_cache(&self) -> &EventCache {
3251 // SAFETY: always initialized in the `Client` ctor.
3252 self.inner.event_cache.get().unwrap()
3253 }
3254
3255 /// The [`LatestEvents`] instance for this [`Client`].
3256 pub async fn latest_events(&self) -> &LatestEvents {
3257 self.inner
3258 .latest_events
3259 .get_or_init(|| async {
3260 LatestEvents::new(
3261 WeakClient::from_client(self),
3262 self.event_cache().clone(),
3263 SendQueue::new(self.clone()),
3264 self.room_info_notable_update_receiver(),
3265 )
3266 })
3267 .await
3268 }
3269
3270 /// Waits until an at least partially synced room is received, and returns
3271 /// it.
3272 ///
3273 /// **Note: this function will loop endlessly until either it finds the room
3274 /// or an externally set timeout happens.**
3275 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3276 loop {
3277 if let Some(room) = self.get_room(room_id) {
3278 if room.is_state_partially_or_fully_synced() {
3279 debug!("Found just created room!");
3280 return room;
3281 }
3282 debug!("Room wasn't partially synced, waiting for sync beat to try again");
3283 } else {
3284 debug!("Room wasn't found, waiting for sync beat to try again");
3285 }
3286 self.inner.sync_beat.listen().await;
3287 }
3288 }
3289
3290 /// Knock on a room given its `room_id_or_alias` to ask for permission to
3291 /// join it.
3292 pub async fn knock(
3293 &self,
3294 room_id_or_alias: OwnedRoomOrAliasId,
3295 reason: Option<String>,
3296 server_names: Vec<OwnedServerName>,
3297 ) -> Result<Room> {
3298 let request =
3299 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3300 let response = self.send(request).await?;
3301 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3302 Ok(Room::new(self.clone(), base_room))
3303 }
3304
3305 /// Checks whether the provided `user_id` belongs to an ignored user.
3306 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3307 self.base_client().is_user_ignored(user_id).await
3308 }
3309
3310 /// Gets the `max_upload_size` value from the homeserver, getting either a
3311 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3312 /// missing.
3313 ///
3314 /// Check the spec for more info:
3315 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3316 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3317 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3318 if let Some(data) = max_upload_size_lock.get() {
3319 return Ok(data.to_owned());
3320 }
3321
3322 // Use the authenticated endpoint when the server supports it.
3323 let supported_versions = self.supported_versions().await?;
3324 let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3325 .is_supported(&supported_versions);
3326
3327 let upload_size = if use_auth {
3328 self.send(authenticated_media::get_media_config::v1::Request::default())
3329 .await?
3330 .upload_size
3331 } else {
3332 #[allow(deprecated)]
3333 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3334 };
3335
3336 match max_upload_size_lock.set(upload_size) {
3337 Ok(_) => Ok(upload_size),
3338 Err(error) => {
3339 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3340 }
3341 }
3342 }
3343
3344 /// The settings to use for decrypting events.
3345 #[cfg(feature = "e2e-encryption")]
3346 pub fn decryption_settings(&self) -> &DecryptionSettings {
3347 &self.base_client().decryption_settings
3348 }
3349
3350 /// Returns the [`SearchIndex`] for this [`Client`].
3351 #[cfg(feature = "experimental-search")]
3352 pub fn search_index(&self) -> &SearchIndex {
3353 &self.inner.search_index
3354 }
3355
3356 /// Whether the client is configured to take thread subscriptions (MSC4306
3357 /// and MSC4308) into account, and the server enabled the experimental
3358 /// feature flag for it.
3359 ///
3360 /// This may cause filtering out of thread subscriptions, and loading the
3361 /// thread subscriptions via the sliding sync extension, when the room
3362 /// list service is being used.
3363 ///
3364 /// This is async and fallible as it may use the network to retrieve the
3365 /// server supported features, if they aren't cached already.
3366 pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3367 // Check if the client is configured to support thread subscriptions first.
3368 match self.base_client().threading_support {
3369 ThreadingSupport::Enabled { with_subscriptions: false }
3370 | ThreadingSupport::Disabled => return Ok(false),
3371 ThreadingSupport::Enabled { with_subscriptions: true } => {}
3372 }
3373
3374 // Now, let's check that the server supports it.
3375 let server_enabled = self
3376 .supported_versions()
3377 .await?
3378 .features
3379 .contains(&FeatureFlag::from("org.matrix.msc4306"));
3380
3381 Ok(server_enabled)
3382 }
3383
3384 /// Fetch thread subscriptions changes between `from` and up to `to`.
3385 ///
3386 /// The `limit` optional parameter can be used to limit the number of
3387 /// entries in a response. It can also be overridden by the server, if
3388 /// it's deemed too large.
3389 pub async fn fetch_thread_subscriptions(
3390 &self,
3391 from: Option<String>,
3392 to: Option<String>,
3393 limit: Option<UInt>,
3394 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3395 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3396 from,
3397 to,
3398 limit,
3399 });
3400 Ok(self.send(request).await?)
3401 }
3402
3403 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3404 self.inner.thread_subscription_catchup.get().unwrap()
3405 }
3406
3407 /// Perform database optimizations if any are available, i.e. vacuuming in
3408 /// SQLite.
3409 ///
3410 /// **Warning:** this was added to check if SQLite fragmentation was the
3411 /// source of performance issues, **DO NOT use in production**.
3412 #[doc(hidden)]
3413 pub async fn optimize_stores(&self) -> Result<()> {
3414 trace!("Optimizing state store...");
3415 self.state_store().optimize().await?;
3416
3417 trace!("Optimizing event cache store...");
3418 if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3419 clean_lock.optimize().await?;
3420 }
3421
3422 trace!("Optimizing media store...");
3423 self.media_store().lock().await?.optimize().await?;
3424
3425 Ok(())
3426 }
3427
3428 /// Returns the sizes of the existing stores, if known.
3429 pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3430 #[cfg(feature = "e2e-encryption")]
3431 let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3432 && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3433 {
3434 Some(store_size)
3435 } else {
3436 None
3437 };
3438 #[cfg(not(feature = "e2e-encryption"))]
3439 let crypto_store_size = None;
3440
3441 let state_store_size = self.state_store().get_size().await.ok().flatten();
3442
3443 let event_cache_store_size = if let Some(clean_lock) =
3444 self.event_cache_store().lock().await?.as_clean()
3445 && let Ok(Some(store_size)) = clean_lock.get_size().await
3446 {
3447 Some(store_size)
3448 } else {
3449 None
3450 };
3451
3452 let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3453
3454 Ok(StoreSizes {
3455 crypto_store: crypto_store_size,
3456 state_store: state_store_size,
3457 event_cache_store: event_cache_store_size,
3458 media_store: media_store_size,
3459 })
3460 }
3461
3462 /// Get a reference to the client's task monitor, for spawning background
3463 /// tasks.
3464 pub fn task_monitor(&self) -> &TaskMonitor {
3465 &self.inner.task_monitor
3466 }
3467
3468 /// Add a subscriber for duplicate key upload error notifications triggered
3469 /// by requests to /keys/upload.
3470 #[cfg(feature = "e2e-encryption")]
3471 pub fn subscribe_to_duplicate_key_upload_errors(
3472 &self,
3473 ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3474 self.inner.duplicate_key_upload_error_sender.subscribe()
3475 }
3476
3477 /// Check the record of whether we are waiting for an [MSC4268] key bundle
3478 /// for the given room.
3479 ///
3480 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3481 #[cfg(feature = "e2e-encryption")]
3482 pub async fn get_pending_key_bundle_details_for_room(
3483 &self,
3484 room_id: &RoomId,
3485 ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3486 Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3487 }
3488
3489 /// Returns the [`DmRoomDefinition`] this client uses to check if a room is
3490 /// a DM.
3491 pub fn dm_room_definition(&self) -> &DmRoomDefinition {
3492 &self.inner.base_client.dm_room_definition
3493 }
3494}
3495
3496/// Contains the disk size of the different stores, if known. It won't be
3497/// available for in-memory stores.
3498#[derive(Debug, Clone)]
3499pub struct StoreSizes {
3500 /// The size of the CryptoStore.
3501 pub crypto_store: Option<usize>,
3502 /// The size of the StateStore.
3503 pub state_store: Option<usize>,
3504 /// The size of the EventCacheStore.
3505 pub event_cache_store: Option<usize>,
3506 /// The size of the MediaStore.
3507 pub media_store: Option<usize>,
3508}
3509
3510#[cfg(any(feature = "testing", test))]
3511impl Client {
3512 /// Test helper to mark users as tracked by the crypto layer.
3513 #[cfg(feature = "e2e-encryption")]
3514 pub async fn update_tracked_users_for_testing(
3515 &self,
3516 user_ids: impl IntoIterator<Item = &UserId>,
3517 ) {
3518 let olm = self.olm_machine().await;
3519 let olm = olm.as_ref().unwrap();
3520 olm.update_tracked_users(user_ids).await.unwrap();
3521 }
3522}
3523
3524/// A weak reference to the inner client, useful when trying to get a handle
3525/// on the owning client.
3526#[derive(Clone, Debug)]
3527pub(crate) struct WeakClient {
3528 client: Weak<ClientInner>,
3529}
3530
3531impl WeakClient {
3532 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3533 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3534 Self { client: Arc::downgrade(client) }
3535 }
3536
3537 /// Construct a [`WeakClient`] from a [`Client`].
3538 pub fn from_client(client: &Client) -> Self {
3539 Self::from_inner(&client.inner)
3540 }
3541
3542 /// Attempts to get a [`Client`] from this [`WeakClient`].
3543 pub fn get(&self) -> Option<Client> {
3544 self.client.upgrade().map(|inner| Client { inner })
3545 }
3546
3547 /// Gets the number of strong (`Arc`) pointers still pointing to this
3548 /// client.
3549 #[allow(dead_code)]
3550 pub fn strong_count(&self) -> usize {
3551 self.client.strong_count()
3552 }
3553}
3554
3555/// Information about the state of a room before we joined it.
3556#[derive(Debug, Clone, Default)]
3557struct PreJoinRoomInfo {
3558 /// The user who invited us to the room, if any.
3559 pub inviter: Option<RoomMember>,
3560}
3561
3562// The http mocking library is not supported for wasm32
3563#[cfg(all(test, not(target_family = "wasm")))]
3564pub(crate) mod tests {
3565 use std::{sync::Arc, time::Duration};
3566
3567 use assert_matches::assert_matches;
3568 use assert_matches2::assert_let;
3569 use eyeball::SharedObservable;
3570 use futures_util::{FutureExt, StreamExt, pin_mut};
3571 use js_int::{UInt, uint};
3572 use matrix_sdk_base::{
3573 RoomState,
3574 store::{MemoryStore, StoreConfig},
3575 ttl::TtlValue,
3576 };
3577 use matrix_sdk_test::{
3578 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3579 event_factory::EventFactory,
3580 };
3581 #[cfg(target_family = "wasm")]
3582 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3583
3584 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3585 use ruma::{
3586 RoomId, ServerName, UserId,
3587 api::{
3588 FeatureFlag, MatrixVersion,
3589 client::{
3590 discovery::discover_homeserver::RtcFocusInfo,
3591 room::create_room::v3::Request as CreateRoomRequest,
3592 },
3593 },
3594 assign,
3595 events::{
3596 ignored_user_list::IgnoredUserListEventContent,
3597 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3598 },
3599 owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3600 };
3601 use serde_json::json;
3602 use stream_assert::{assert_next_matches, assert_pending};
3603 use tokio::{
3604 spawn,
3605 time::{sleep, timeout},
3606 };
3607 use url::Url;
3608
3609 use super::Client;
3610 use crate::{
3611 Error, TransmissionProgress,
3612 client::{WeakClient, caches::CachedValue, futures::SendMediaUploadRequest},
3613 config::{RequestConfig, SyncSettings},
3614 futures::SendRequest,
3615 media::MediaError,
3616 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3617 };
3618
3619 #[async_test]
3620 async fn test_account_data() {
3621 let server = MatrixMockServer::new().await;
3622 let client = server.client_builder().build().await;
3623
3624 let f = EventFactory::new();
3625 server
3626 .mock_sync()
3627 .ok_and_run(&client, |builder| {
3628 builder.add_global_account_data(
3629 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3630 );
3631 })
3632 .await;
3633
3634 let content = client
3635 .account()
3636 .account_data::<IgnoredUserListEventContent>()
3637 .await
3638 .unwrap()
3639 .unwrap()
3640 .deserialize()
3641 .unwrap();
3642
3643 assert_eq!(content.ignored_users.len(), 1);
3644 }
3645
3646 #[async_test]
3647 async fn test_successful_discovery() {
3648 // Imagine this is `matrix.org`.
3649 let server = MatrixMockServer::new().await;
3650 let server_url = server.uri();
3651
3652 // Imagine this is `matrix-client.matrix.org`.
3653 let homeserver = MatrixMockServer::new().await;
3654 let homeserver_url = homeserver.uri();
3655
3656 // Imagine Alice has the user ID `@alice:matrix.org`.
3657 let domain = server_url.strip_prefix("http://").unwrap();
3658 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3659
3660 // The `.well-known` is on the server (e.g. `matrix.org`).
3661 server
3662 .mock_well_known()
3663 .ok_with_homeserver_url(&homeserver_url)
3664 .mock_once()
3665 .named("well-known")
3666 .mount()
3667 .await;
3668
3669 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3670 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3671
3672 let client = Client::builder()
3673 .insecure_server_name_no_tls(alice.server_name())
3674 .build()
3675 .await
3676 .unwrap();
3677
3678 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3679 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3680 client.server_versions().await.unwrap();
3681 }
3682
3683 #[async_test]
3684 async fn test_discovery_broken_server() {
3685 let server = MatrixMockServer::new().await;
3686 let server_url = server.uri();
3687 let domain = server_url.strip_prefix("http://").unwrap();
3688 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3689
3690 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3691
3692 assert!(
3693 Client::builder()
3694 .insecure_server_name_no_tls(alice.server_name())
3695 .build()
3696 .await
3697 .is_err(),
3698 "Creating a client from a user ID should fail when the .well-known request fails."
3699 );
3700 }
3701
3702 #[async_test]
3703 async fn test_room_creation() {
3704 let server = MatrixMockServer::new().await;
3705 let client = server.client_builder().build().await;
3706
3707 let f = EventFactory::new().sender(user_id!("@example:localhost"));
3708 server
3709 .mock_sync()
3710 .ok_and_run(&client, |builder| {
3711 builder.add_joined_room(
3712 JoinedRoomBuilder::default()
3713 .add_state_event(
3714 f.member(user_id!("@example:localhost")).display_name("example"),
3715 )
3716 .add_state_event(f.default_power_levels()),
3717 );
3718 })
3719 .await;
3720
3721 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3722 assert_eq!(room.state(), RoomState::Joined);
3723 }
3724
3725 #[async_test]
3726 async fn test_retry_limit_http_requests() {
3727 let server = MatrixMockServer::new().await;
3728 let client = server
3729 .client_builder()
3730 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3731 .build()
3732 .await;
3733
3734 assert!(client.request_config().retry_limit.unwrap() == 4);
3735
3736 server.mock_who_am_i().error500().expect(4).mount().await;
3737
3738 client.whoami().await.unwrap_err();
3739 }
3740
3741 #[async_test]
3742 async fn test_retry_timeout_http_requests() {
3743 // Keep this timeout small so that the test doesn't take long
3744 let retry_timeout = Duration::from_secs(5);
3745 let server = MatrixMockServer::new().await;
3746 let client = server
3747 .client_builder()
3748 .on_builder(|builder| {
3749 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3750 })
3751 .build()
3752 .await;
3753
3754 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3755
3756 server.mock_login().error500().expect(2..).mount().await;
3757
3758 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3759 }
3760
3761 #[async_test]
3762 async fn test_short_retry_initial_http_requests() {
3763 let server = MatrixMockServer::new().await;
3764 let client = server
3765 .client_builder()
3766 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3767 .build()
3768 .await;
3769
3770 server.mock_login().error500().expect(3..).mount().await;
3771
3772 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3773 }
3774
3775 #[async_test]
3776 async fn test_no_retry_http_requests() {
3777 let server = MatrixMockServer::new().await;
3778 let client = server.client_builder().build().await;
3779
3780 server.mock_devices().error500().mock_once().mount().await;
3781
3782 client.devices().await.unwrap_err();
3783 }
3784
3785 #[async_test]
3786 async fn test_set_homeserver() {
3787 let client = MockClientBuilder::new(None).build().await;
3788 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3789
3790 let homeserver = Url::parse("http://example.com/").unwrap();
3791 client.set_homeserver(homeserver.clone());
3792 assert_eq!(client.homeserver(), homeserver);
3793 }
3794
3795 #[async_test]
3796 async fn test_search_user_request() {
3797 let server = MatrixMockServer::new().await;
3798 let client = server.client_builder().build().await;
3799
3800 server.mock_user_directory().ok().mock_once().mount().await;
3801
3802 let response = client.search_users("test", 50).await.unwrap();
3803 assert_eq!(response.results.len(), 1);
3804 let result = &response.results[0];
3805 assert_eq!(result.user_id.to_string(), "@test:example.me");
3806 assert_eq!(result.display_name.clone().unwrap(), "Test");
3807 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3808 assert!(!response.limited);
3809 }
3810
3811 #[async_test]
3812 async fn test_request_unstable_features() {
3813 let server = MatrixMockServer::new().await;
3814 let client = server.client_builder().no_server_versions().build().await;
3815
3816 server
3817 .mock_versions()
3818 .with_feature("org.matrix.e2e_cross_signing", true)
3819 .ok()
3820 .mock_once()
3821 .mount()
3822 .await;
3823
3824 let unstable_features = client.unstable_features().await.unwrap();
3825 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3826 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3827 }
3828
3829 #[async_test]
3830 async fn test_can_homeserver_push_encrypted_event_to_device() {
3831 let server = MatrixMockServer::new().await;
3832 let client = server.client_builder().no_server_versions().build().await;
3833
3834 server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3835
3836 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3837 assert!(msc4028_enabled);
3838 }
3839
3840 #[async_test]
3841 async fn test_recently_visited_rooms() {
3842 // Tracking recently visited rooms requires authentication
3843 let client = MockClientBuilder::new(None).unlogged().build().await;
3844 assert_matches!(
3845 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3846 Err(Error::AuthenticationRequired)
3847 );
3848
3849 let client = MockClientBuilder::new(None).build().await;
3850 let account = client.account();
3851
3852 // We should start off with an empty list
3853 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3854
3855 // Tracking a valid room id should add it to the list
3856 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3857 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3858 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3859
3860 // And the existing list shouldn't be changed
3861 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3862 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3863
3864 // Tracking the same room again shouldn't change the list
3865 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3866 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3867 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3868
3869 // Tracking a second room should add it to the front of the list
3870 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3871 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3872 assert_eq!(
3873 account.get_recently_visited_rooms().await.unwrap(),
3874 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3875 );
3876
3877 // Tracking the first room yet again should move it to the front of the list
3878 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3879 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3880 assert_eq!(
3881 account.get_recently_visited_rooms().await.unwrap(),
3882 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3883 );
3884
3885 // Tracking should be capped at 20
3886 for n in 0..20 {
3887 account
3888 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3889 .await
3890 .unwrap();
3891 }
3892
3893 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3894
3895 // And the initial rooms should've been pushed out
3896 let rooms = account.get_recently_visited_rooms().await.unwrap();
3897 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3898 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3899
3900 // And the last tracked room should be the first
3901 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3902 }
3903
3904 #[async_test]
3905 async fn test_client_no_cycle_with_event_cache() {
3906 let client = MockClientBuilder::new(None).build().await;
3907
3908 // Wait for the init tasks to die.
3909 sleep(Duration::from_secs(1)).await;
3910
3911 let weak_client = WeakClient::from_client(&client);
3912 assert_eq!(weak_client.strong_count(), 1);
3913
3914 {
3915 let room_id = room_id!("!room:example.org");
3916
3917 // Have the client know the room.
3918 let response = SyncResponseBuilder::default()
3919 .add_joined_room(JoinedRoomBuilder::new(room_id))
3920 .build_sync_response();
3921 client.inner.base_client.receive_sync_response(response).await.unwrap();
3922
3923 client.event_cache().subscribe().unwrap();
3924
3925 let (_room_event_cache, _drop_handles) =
3926 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3927 }
3928
3929 drop(client);
3930
3931 // Give a bit of time for background tasks to die.
3932 sleep(Duration::from_secs(1)).await;
3933
3934 // The weak client must be the last reference to the client now.
3935 assert_eq!(weak_client.strong_count(), 0);
3936 let client = weak_client.get();
3937 assert!(
3938 client.is_none(),
3939 "too many strong references to the client: {}",
3940 Arc::strong_count(&client.unwrap().inner)
3941 );
3942 }
3943
3944 #[async_test]
3945 async fn test_supported_versions_caching() {
3946 let server = MatrixMockServer::new().await;
3947
3948 let versions_mock = server
3949 .mock_versions()
3950 .expect_default_access_token()
3951 .with_feature("org.matrix.e2e_cross_signing", true)
3952 .ok()
3953 .named("first versions mock")
3954 .expect(1)
3955 .mount_as_scoped()
3956 .await;
3957
3958 let memory_store = Arc::new(MemoryStore::new());
3959 let client = server
3960 .client_builder()
3961 .no_server_versions()
3962 .on_builder(|builder| {
3963 builder.store_config(
3964 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3965 .state_store(memory_store.clone()),
3966 )
3967 })
3968 .build()
3969 .await;
3970
3971 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3972
3973 // The result was cached.
3974 assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3975 // This subsequent call hits the in-memory cache.
3976 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3977
3978 drop(client);
3979
3980 let client = server
3981 .client_builder()
3982 .no_server_versions()
3983 .on_builder(|builder| {
3984 builder.store_config(
3985 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3986 .state_store(memory_store.clone()),
3987 )
3988 })
3989 .build()
3990 .await;
3991
3992 // These calls to the new client hit the on-disk cache.
3993 assert!(
3994 client
3995 .unstable_features()
3996 .await
3997 .unwrap()
3998 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3999 );
4000
4001 let supported = client.supported_versions().await.unwrap();
4002 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4003 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4004
4005 // Then this call hits the in-memory cache.
4006 let supported = client.supported_versions().await.unwrap();
4007 assert!(supported.versions.contains(&MatrixVersion::V1_0));
4008 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
4009
4010 drop(versions_mock);
4011
4012 // Now, reset the cache, and observe the endpoint being called again once.
4013 client.reset_supported_versions().await.unwrap();
4014
4015 server.mock_versions().ok().expect(2).named("second versions mock").mount().await;
4016
4017 // Hits network again.
4018 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4019 // Hits in-memory cache again.
4020 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
4021 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4022
4023 // Force an expiry of the data.
4024 let supported_versions = client.supported_versions_cached().await.unwrap().unwrap();
4025 let mut ttl_value = TtlValue::new(supported_versions);
4026 ttl_value.expire();
4027 client.inner.caches.supported_versions.set_value(ttl_value);
4028
4029 // Call the method to trigger a cache refresh background task.
4030 client.supported_versions_cached().await.unwrap().unwrap();
4031
4032 // We wait for the task to finish, the endpoint should have been called again.
4033 sleep(Duration::from_secs(1)).await;
4034 assert_matches!(client.inner.caches.supported_versions.value(), CachedValue::Cached(value) if !value.has_expired());
4035 }
4036
4037 #[async_test]
4038 async fn test_well_known_caching() {
4039 let server = MatrixMockServer::new().await;
4040 let server_url = server.uri();
4041 let domain = server_url.strip_prefix("http://").unwrap();
4042 let server_name = <&ServerName>::try_from(domain).unwrap();
4043 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
4044
4045 let well_known_mock = server
4046 .mock_well_known()
4047 .ok()
4048 .named("well known mock")
4049 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
4050 .mount_as_scoped()
4051 .await;
4052
4053 let memory_store = Arc::new(MemoryStore::new());
4054 let client = Client::builder()
4055 .insecure_server_name_no_tls(server_name)
4056 .store_config(
4057 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4058 .state_store(memory_store.clone()),
4059 )
4060 .build()
4061 .await
4062 .unwrap();
4063
4064 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4065
4066 // This subsequent call hits the in-memory cache.
4067 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4068
4069 drop(client);
4070
4071 let client = server
4072 .client_builder()
4073 .no_server_versions()
4074 .on_builder(|builder| {
4075 builder.store_config(
4076 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4077 .state_store(memory_store.clone()),
4078 )
4079 })
4080 .build()
4081 .await;
4082
4083 // This call to the new client hits the on-disk cache.
4084 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4085
4086 // Then this call hits the in-memory cache.
4087 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4088
4089 drop(well_known_mock);
4090
4091 // Now, reset the cache, and observe the endpoints being called again once.
4092 client.reset_well_known().await.unwrap();
4093
4094 server.mock_well_known().ok().named("second well known mock").expect(2).mount().await;
4095
4096 // Hits network again.
4097 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4098 // Hits in-memory cache again.
4099 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4100
4101 // Force an expiry of the data.
4102 let well_known = client.well_known().await;
4103 let mut ttl_value = TtlValue::new(well_known);
4104 ttl_value.expire();
4105 client.inner.caches.well_known.set_value(ttl_value);
4106
4107 // Call the method again to trigger a cache refresh background task.
4108 client.well_known().await;
4109
4110 // We wait for the task to finish, the endpoint should have been called again.
4111 // We need to wait a bit because the first requests using the server name of the
4112 // user will fail, only the requests using the homeserver URL will succeed.
4113 sleep(Duration::from_secs(5)).await;
4114 assert_matches!(client.inner.caches.well_known.value(), CachedValue::Cached(value) if !value.has_expired());
4115 }
4116
4117 #[async_test]
4118 async fn test_missing_well_known_caching() {
4119 let server = MatrixMockServer::new().await;
4120 let rtc_foci: Vec<RtcFocusInfo> = vec![];
4121
4122 let well_known_mock = server
4123 .mock_well_known()
4124 .error_unrecognized()
4125 .named("first well-known mock")
4126 .expect(1)
4127 .mount_as_scoped()
4128 .await;
4129
4130 let memory_store = Arc::new(MemoryStore::new());
4131 let client = server
4132 .client_builder()
4133 .on_builder(|builder| {
4134 builder.store_config(
4135 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4136 .state_store(memory_store.clone()),
4137 )
4138 })
4139 .build()
4140 .await;
4141
4142 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4143
4144 // This subsequent call hits the in-memory cache.
4145 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4146
4147 drop(client);
4148
4149 let client = server
4150 .client_builder()
4151 .on_builder(|builder| {
4152 builder.store_config(
4153 StoreConfig::new(CrossProcessLockConfig::SingleProcess)
4154 .state_store(memory_store.clone()),
4155 )
4156 })
4157 .build()
4158 .await;
4159
4160 // This call to the new client hits the on-disk cache.
4161 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4162
4163 // Then this call hits the in-memory cache.
4164 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4165
4166 drop(well_known_mock);
4167
4168 // Now, reset the cache, and observe the endpoints being called again once.
4169 client.reset_well_known().await.unwrap();
4170
4171 server
4172 .mock_well_known()
4173 .error_unrecognized()
4174 .expect(1)
4175 .named("second well-known mock")
4176 .mount()
4177 .await;
4178
4179 // Hits network again.
4180 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4181 // Hits in-memory cache again.
4182 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
4183 }
4184
4185 #[async_test]
4186 async fn test_no_network_doesnt_cause_infinite_retries() {
4187 // We want infinite retries for transient errors.
4188 let client = MockClientBuilder::new(None)
4189 .on_builder(|builder| builder.request_config(RequestConfig::new()))
4190 .build()
4191 .await;
4192
4193 // We don't define a mock server on purpose here, so that the error is really a
4194 // network error.
4195 client.whoami().await.unwrap_err();
4196 }
4197
4198 #[async_test]
4199 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4200 let server = MatrixMockServer::new().await;
4201 let client = server.client_builder().build().await;
4202
4203 let room_id = room_id!("!room:example.org");
4204
4205 server
4206 .mock_sync()
4207 .ok_and_run(&client, |builder| {
4208 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4209 })
4210 .await;
4211
4212 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4213 assert_eq!(room.room_id(), room_id);
4214 }
4215
4216 #[async_test]
4217 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4218 let server = MatrixMockServer::new().await;
4219 let client = server.client_builder().build().await;
4220
4221 let room_id = room_id!("!room:example.org");
4222
4223 let client = Arc::new(client);
4224
4225 // Perform the /sync request with a delay so it starts after the
4226 // `await_room_remote_echo` call has happened
4227 spawn({
4228 let client = client.clone();
4229 async move {
4230 sleep(Duration::from_millis(100)).await;
4231
4232 server
4233 .mock_sync()
4234 .ok_and_run(&client, |builder| {
4235 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4236 })
4237 .await;
4238 }
4239 });
4240
4241 let room =
4242 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4243 assert_eq!(room.room_id(), room_id);
4244 }
4245
4246 #[async_test]
4247 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4248 let client = MockClientBuilder::new(None).build().await;
4249
4250 let room_id = room_id!("!room:example.org");
4251 // Room is not present so the client won't be able to find it. The call will
4252 // timeout.
4253 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4254 }
4255
4256 #[async_test]
4257 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4258 let server = MatrixMockServer::new().await;
4259 let client = server.client_builder().build().await;
4260
4261 server.mock_create_room().ok().mount().await;
4262
4263 // Create a room in the internal store
4264 let room = client
4265 .create_room(assign!(CreateRoomRequest::new(), {
4266 invite: vec![],
4267 is_direct: false,
4268 }))
4269 .await
4270 .unwrap();
4271
4272 // Room is locally present, but not synced, the call will timeout
4273 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4274 .await
4275 .unwrap_err();
4276 }
4277
4278 #[async_test]
4279 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4280 let server = MatrixMockServer::new().await;
4281 let client = server.client_builder().build().await;
4282
4283 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4284
4285 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4286 assert_matches!(ret, Ok(true));
4287 }
4288
4289 #[async_test]
4290 async fn test_is_room_alias_available_if_alias_is_resolved() {
4291 let server = MatrixMockServer::new().await;
4292 let client = server.client_builder().build().await;
4293
4294 server
4295 .mock_room_directory_resolve_alias()
4296 .ok("!some_room_id:matrix.org", Vec::new())
4297 .expect(1)
4298 .mount()
4299 .await;
4300
4301 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4302 assert_matches!(ret, Ok(false));
4303 }
4304
4305 #[async_test]
4306 async fn test_is_room_alias_available_if_error_found() {
4307 let server = MatrixMockServer::new().await;
4308 let client = server.client_builder().build().await;
4309
4310 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4311
4312 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4313 assert_matches!(ret, Err(_));
4314 }
4315
4316 #[async_test]
4317 async fn test_create_room_alias() {
4318 let server = MatrixMockServer::new().await;
4319 let client = server.client_builder().build().await;
4320
4321 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4322
4323 let ret = client
4324 .create_room_alias(
4325 room_alias_id!("#some_alias:matrix.org"),
4326 room_id!("!some_room:matrix.org"),
4327 )
4328 .await;
4329 assert_matches!(ret, Ok(()));
4330 }
4331
4332 #[async_test]
4333 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4334 let server = MatrixMockServer::new().await;
4335 let client = server.client_builder().build().await;
4336
4337 let room_id = room_id!("!a-room:matrix.org");
4338
4339 // Make sure the summary endpoint is called once
4340 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4341
4342 // We create a locally cached invited room
4343 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4344
4345 // And we get a preview, the server endpoint was reached
4346 let preview = client
4347 .get_room_preview(room_id.into(), Vec::new())
4348 .await
4349 .expect("Room preview should be retrieved");
4350
4351 assert_eq!(invited_room.room_id(), preview.room_id);
4352 }
4353
4354 #[async_test]
4355 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4356 let server = MatrixMockServer::new().await;
4357 let client = server.client_builder().build().await;
4358
4359 let room_id = room_id!("!a-room:matrix.org");
4360
4361 // Make sure the summary endpoint is called once
4362 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4363
4364 // We create a locally cached left room
4365 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4366
4367 // And we get a preview, the server endpoint was reached
4368 let preview = client
4369 .get_room_preview(room_id.into(), Vec::new())
4370 .await
4371 .expect("Room preview should be retrieved");
4372
4373 assert_eq!(left_room.room_id(), preview.room_id);
4374 }
4375
4376 #[async_test]
4377 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4378 let server = MatrixMockServer::new().await;
4379 let client = server.client_builder().build().await;
4380
4381 let room_id = room_id!("!a-room:matrix.org");
4382
4383 // Make sure the summary endpoint is called once
4384 server.mock_room_summary().ok(room_id).mock_once().mount().await;
4385
4386 // We create a locally cached knocked room
4387 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4388
4389 // And we get a preview, the server endpoint was reached
4390 let preview = client
4391 .get_room_preview(room_id.into(), Vec::new())
4392 .await
4393 .expect("Room preview should be retrieved");
4394
4395 assert_eq!(knocked_room.room_id(), preview.room_id);
4396 }
4397
4398 #[async_test]
4399 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4400 let server = MatrixMockServer::new().await;
4401 let client = server.client_builder().build().await;
4402
4403 let room_id = room_id!("!a-room:matrix.org");
4404
4405 // Make sure the summary endpoint is not called
4406 server.mock_room_summary().ok(room_id).never().mount().await;
4407
4408 // We create a locally cached joined room
4409 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4410
4411 // And we get a preview, no server endpoint was reached
4412 let preview = client
4413 .get_room_preview(room_id.into(), Vec::new())
4414 .await
4415 .expect("Room preview should be retrieved");
4416
4417 assert_eq!(joined_room.room_id(), preview.room_id);
4418 }
4419
4420 #[async_test]
4421 async fn test_media_preview_config() {
4422 let server = MatrixMockServer::new().await;
4423 let client = server.client_builder().build().await;
4424
4425 server
4426 .mock_sync()
4427 .ok_and_run(&client, |builder| {
4428 builder.add_custom_global_account_data(json!({
4429 "content": {
4430 "media_previews": "private",
4431 "invite_avatars": "off"
4432 },
4433 "type": "m.media_preview_config"
4434 }));
4435 })
4436 .await;
4437
4438 let (initial_value, stream) =
4439 client.account().observe_media_preview_config().await.unwrap();
4440
4441 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4442 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4443 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4444 pin_mut!(stream);
4445 assert_pending!(stream);
4446
4447 server
4448 .mock_sync()
4449 .ok_and_run(&client, |builder| {
4450 builder.add_custom_global_account_data(json!({
4451 "content": {
4452 "media_previews": "off",
4453 "invite_avatars": "on"
4454 },
4455 "type": "m.media_preview_config"
4456 }));
4457 })
4458 .await;
4459
4460 assert_next_matches!(
4461 stream,
4462 MediaPreviewConfigEventContent {
4463 media_previews: Some(MediaPreviews::Off),
4464 invite_avatars: Some(InviteAvatars::On),
4465 ..
4466 }
4467 );
4468 assert_pending!(stream);
4469 }
4470
4471 #[async_test]
4472 async fn test_unstable_media_preview_config() {
4473 let server = MatrixMockServer::new().await;
4474 let client = server.client_builder().build().await;
4475
4476 server
4477 .mock_sync()
4478 .ok_and_run(&client, |builder| {
4479 builder.add_custom_global_account_data(json!({
4480 "content": {
4481 "media_previews": "private",
4482 "invite_avatars": "off"
4483 },
4484 "type": "io.element.msc4278.media_preview_config"
4485 }));
4486 })
4487 .await;
4488
4489 let (initial_value, stream) =
4490 client.account().observe_media_preview_config().await.unwrap();
4491
4492 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4493 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4494 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4495 pin_mut!(stream);
4496 assert_pending!(stream);
4497
4498 server
4499 .mock_sync()
4500 .ok_and_run(&client, |builder| {
4501 builder.add_custom_global_account_data(json!({
4502 "content": {
4503 "media_previews": "off",
4504 "invite_avatars": "on"
4505 },
4506 "type": "io.element.msc4278.media_preview_config"
4507 }));
4508 })
4509 .await;
4510
4511 assert_next_matches!(
4512 stream,
4513 MediaPreviewConfigEventContent {
4514 media_previews: Some(MediaPreviews::Off),
4515 invite_avatars: Some(InviteAvatars::On),
4516 ..
4517 }
4518 );
4519 assert_pending!(stream);
4520 }
4521
4522 #[async_test]
4523 async fn test_media_preview_config_not_found() {
4524 let server = MatrixMockServer::new().await;
4525 let client = server.client_builder().build().await;
4526
4527 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4528
4529 assert!(initial_value.is_none());
4530 }
4531
4532 #[async_test]
4533 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4534 // The default Matrix version we use is 1.11 or higher, so authenticated media
4535 // is supported.
4536 let server = MatrixMockServer::new().await;
4537 let client = server.client_builder().build().await;
4538
4539 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4540
4541 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4542 client.load_or_fetch_max_upload_size().await.unwrap();
4543
4544 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4545 }
4546
4547 #[async_test]
4548 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4549 // The server must advertise support for the stable feature for authenticated
4550 // media support, so we mock the `GET /versions` response.
4551 let server = MatrixMockServer::new().await;
4552 let client = server.client_builder().no_server_versions().build().await;
4553
4554 server
4555 .mock_versions()
4556 .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4557 .with_feature("org.matrix.msc3916.stable", true)
4558 .ok()
4559 .named("versions")
4560 .expect(1)
4561 .mount()
4562 .await;
4563
4564 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4565
4566 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4567 client.load_or_fetch_max_upload_size().await.unwrap();
4568
4569 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4570 }
4571
4572 #[async_test]
4573 async fn test_load_or_fetch_max_upload_size_no_auth() {
4574 // The server must not support Matrix 1.11 or higher for unauthenticated
4575 // media requests, so we mock the `GET /versions` response.
4576 let server = MatrixMockServer::new().await;
4577 let client = server.client_builder().no_server_versions().build().await;
4578
4579 server
4580 .mock_versions()
4581 .with_versions(vec!["v1.1"])
4582 .ok()
4583 .named("versions")
4584 .expect(1)
4585 .mount()
4586 .await;
4587
4588 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4589
4590 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4591 client.load_or_fetch_max_upload_size().await.unwrap();
4592
4593 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4594 }
4595
4596 #[async_test]
4597 async fn test_uploading_a_too_large_media_file() {
4598 let server = MatrixMockServer::new().await;
4599 let client = server.client_builder().build().await;
4600
4601 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4602 client.load_or_fetch_max_upload_size().await.unwrap();
4603 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4604
4605 let data = vec![1, 2];
4606 let upload_request =
4607 ruma::api::client::media::create_content::v3::Request::new(data.clone());
4608 let request = SendRequest {
4609 client: client.clone(),
4610 request: upload_request,
4611 config: None,
4612 send_progress: SharedObservable::new(TransmissionProgress::default()),
4613 };
4614 let media_request = SendMediaUploadRequest::new(request);
4615
4616 let error = media_request.await.err();
4617 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4618 assert_eq!(max, uint!(1));
4619 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4620 }
4621
4622 #[async_test]
4623 async fn test_dont_ignore_timeout_on_first_sync() {
4624 let server = MatrixMockServer::new().await;
4625 let client = server.client_builder().build().await;
4626
4627 server
4628 .mock_sync()
4629 .timeout(Some(Duration::from_secs(30)))
4630 .ok(|_| {})
4631 .mock_once()
4632 .named("sync_with_timeout")
4633 .mount()
4634 .await;
4635
4636 // Call the endpoint once to check the timeout.
4637 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4638
4639 timeout(Duration::from_secs(1), async {
4640 stream.next().await.unwrap().unwrap();
4641 })
4642 .await
4643 .unwrap();
4644 }
4645
4646 #[async_test]
4647 async fn test_ignore_timeout_on_first_sync() {
4648 let server = MatrixMockServer::new().await;
4649 let client = server.client_builder().build().await;
4650
4651 server
4652 .mock_sync()
4653 .timeout(None)
4654 .ok(|_| {})
4655 .mock_once()
4656 .named("sync_no_timeout")
4657 .mount()
4658 .await;
4659 server
4660 .mock_sync()
4661 .timeout(Some(Duration::from_secs(30)))
4662 .ok(|_| {})
4663 .mock_once()
4664 .named("sync_with_timeout")
4665 .mount()
4666 .await;
4667
4668 // Call each version of the endpoint once to check the timeouts.
4669 let mut stream = Box::pin(
4670 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4671 );
4672
4673 timeout(Duration::from_secs(1), async {
4674 stream.next().await.unwrap().unwrap();
4675 stream.next().await.unwrap().unwrap();
4676 })
4677 .await
4678 .unwrap();
4679 }
4680
4681 #[async_test]
4682 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4683 let server = MatrixMockServer::new().await;
4684 let client = server.client_builder().build().await;
4685 // This is the user ID that is inside MemberAdditional.
4686 // Note the confusing username, so we can share
4687 // GlobalAccountDataTestEvent::Direct with the invited test.
4688 let user_id = user_id!("@invited:localhost");
4689
4690 // When we receive a sync response saying "invited" is invited to a DM
4691 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4692 let response = SyncResponseBuilder::default()
4693 .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4694 .add_global_account_data(
4695 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4696 )
4697 .build_sync_response();
4698 client.base_client().receive_sync_response(response).await.unwrap();
4699
4700 // Then get_dm_room finds this room
4701 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4702 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4703 }
4704
4705 #[async_test]
4706 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4707 let server = MatrixMockServer::new().await;
4708 let client = server.client_builder().build().await;
4709 // This is the user ID that is inside MemberInvite
4710 let user_id = user_id!("@invited:localhost");
4711
4712 // When we receive a sync response saying "invited" is invited to a DM
4713 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4714 let response = SyncResponseBuilder::default()
4715 .add_joined_room(
4716 JoinedRoomBuilder::default()
4717 .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4718 )
4719 .add_global_account_data(
4720 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4721 )
4722 .build_sync_response();
4723 client.base_client().receive_sync_response(response).await.unwrap();
4724
4725 // Then get_dm_room finds this room
4726 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4727 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4728 }
4729
4730 #[async_test]
4731 async fn test_get_dm_room_still_finds_left_room() {
4732 // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4733 // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4734
4735 let server = MatrixMockServer::new().await;
4736 let client = server.client_builder().build().await;
4737 // This is the user ID that is inside MemberAdditional.
4738 // Note the confusing username, so we can share
4739 // GlobalAccountDataTestEvent::Direct with the invited test.
4740 let user_id = user_id!("@invited:localhost");
4741
4742 // When we receive a sync response saying "invited" has left a DM
4743 let f = EventFactory::new().sender(user_id);
4744 let response = SyncResponseBuilder::default()
4745 .add_joined_room(
4746 JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4747 )
4748 .add_global_account_data(
4749 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4750 )
4751 .build_sync_response();
4752 client.base_client().receive_sync_response(response).await.unwrap();
4753
4754 // Then get_dm_room finds this room
4755 let found_room = client.get_dm_room(user_id).expect("DM not found!");
4756 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4757 }
4758
4759 #[async_test]
4760 async fn test_device_exists() {
4761 let server = MatrixMockServer::new().await;
4762 let client = server.client_builder().build().await;
4763
4764 server.mock_get_device().ok().expect(1).mount().await;
4765
4766 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4767 }
4768
4769 #[async_test]
4770 async fn test_device_exists_404() {
4771 let server = MatrixMockServer::new().await;
4772 let client = server.client_builder().build().await;
4773
4774 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4775 }
4776
4777 #[async_test]
4778 async fn test_device_exists_500() {
4779 let server = MatrixMockServer::new().await;
4780 let client = server.client_builder().build().await;
4781
4782 server.mock_get_device().error500().expect(1).mount().await;
4783
4784 assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4785 }
4786
4787 #[async_test]
4788 async fn test_fetching_well_known_with_homeserver_url() {
4789 let server = MatrixMockServer::new().await;
4790 let client = server.client_builder().build().await;
4791 server.mock_well_known().ok().mount().await;
4792
4793 assert_matches!(client.fetch_client_well_known().await, Some(_));
4794 }
4795
4796 #[async_test]
4797 async fn test_fetching_well_known_with_server_name() {
4798 let server = MatrixMockServer::new().await;
4799 let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4800
4801 server.mock_well_known().ok().mount().await;
4802
4803 let client = MockClientBuilder::new(None)
4804 .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4805 .build()
4806 .await;
4807
4808 assert_matches!(client.fetch_client_well_known().await, Some(_));
4809 }
4810
4811 #[async_test]
4812 async fn test_fetching_well_known_with_domain_part_of_user_id() {
4813 let server = MatrixMockServer::new().await;
4814 server.mock_well_known().ok().mount().await;
4815
4816 let user_id =
4817 UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4818 let client = MockClientBuilder::new(None)
4819 .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4820 .build()
4821 .await;
4822
4823 assert_matches!(client.fetch_client_well_known().await, Some(_));
4824 }
4825}