matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap, BTreeSet},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23 time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
33use matrix_sdk_base::{
34 event_cache::store::EventCacheStoreLock,
35 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
36 sync::{Notification, RoomUpdates},
37 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
38 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
39};
40use matrix_sdk_common::ttl_cache::TtlCache;
41#[cfg(feature = "e2e-encryption")]
42use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
43use ruma::{
44 api::{
45 client::{
46 account::whoami,
47 alias::{create_alias, delete_alias, get_alias},
48 authenticated_media,
49 device::{delete_devices, get_devices, update_device},
50 directory::{get_public_rooms, get_public_rooms_filtered},
51 discovery::{
52 discover_homeserver::{self, RtcFocusInfo},
53 get_capabilities::{self, v3::Capabilities},
54 get_supported_versions,
55 },
56 error::ErrorKind,
57 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
58 knock::knock_room,
59 media,
60 membership::{join_room_by_id, join_room_by_id_or_alias},
61 room::create_room,
62 session::login::v3::DiscoveryInfo,
63 sync::sync_events,
64 threads::get_thread_subscriptions_changes,
65 uiaa,
66 user_directory::search_users,
67 },
68 error::FromHttpResponseError,
69 federation::discovery::get_server_version,
70 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
71 },
72 assign,
73 push::Ruleset,
74 time::Instant,
75 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
76 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
77};
78use serde::de::DeserializeOwned;
79use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
80use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
81use url::Url;
82
83use self::futures::SendRequest;
84use crate::{
85 authentication::{
86 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
87 SaveSessionCallback,
88 },
89 client::thread_subscriptions::ThreadSubscriptionCatchup,
90 config::{RequestConfig, SyncToken},
91 deduplicating_handler::DeduplicatingHandler,
92 error::HttpResult,
93 event_cache::EventCache,
94 event_handler::{
95 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
96 EventHandlerStore, ObservableEventHandler, SyncEvent,
97 },
98 http_client::HttpClient,
99 latest_events::LatestEvents,
100 media::MediaError,
101 notification_settings::NotificationSettings,
102 room::RoomMember,
103 room_preview::RoomPreview,
104 send_queue::{SendQueue, SendQueueData},
105 sliding_sync::Version as SlidingSyncVersion,
106 sync::{RoomUpdate, SyncResponse},
107 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
108 Room, SessionTokens, TransmissionProgress,
109};
110#[cfg(feature = "e2e-encryption")]
111use crate::{
112 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
113 store_locks::CrossProcessStoreLock,
114};
115
116mod builder;
117pub(crate) mod caches;
118pub(crate) mod futures;
119#[cfg(feature = "experimental-search")]
120pub(crate) mod search;
121pub(crate) mod thread_subscriptions;
122
123pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
124#[cfg(feature = "experimental-search")]
125use crate::client::search::SearchIndex;
126
127#[cfg(not(target_family = "wasm"))]
128type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
129#[cfg(target_family = "wasm")]
130type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
131
132#[cfg(not(target_family = "wasm"))]
133type NotificationHandlerFn =
134 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
135#[cfg(target_family = "wasm")]
136type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
137
138/// Enum controlling if a loop running callbacks should continue or abort.
139///
140/// This is mainly used in the [`sync_with_callback`] method, the return value
141/// of the provided callback controls if the sync loop should be exited.
142///
143/// [`sync_with_callback`]: #method.sync_with_callback
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum LoopCtrl {
146 /// Continue running the loop.
147 Continue,
148 /// Break out of the loop.
149 Break,
150}
151
152/// Represents changes that can occur to a `Client`s `Session`.
153#[derive(Debug, Clone, PartialEq)]
154pub enum SessionChange {
155 /// The session's token is no longer valid.
156 UnknownToken {
157 /// Whether or not the session was soft logged out
158 soft_logout: bool,
159 },
160 /// The session's tokens have been refreshed.
161 TokensRefreshed,
162}
163
164/// Information about the server vendor obtained from the federation API.
165#[derive(Debug, Clone, PartialEq, Eq)]
166#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
167pub struct ServerVendorInfo {
168 /// The server name.
169 pub server_name: String,
170 /// The server version.
171 pub version: String,
172}
173
174/// An async/await enabled Matrix client.
175///
176/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
177#[derive(Clone)]
178pub struct Client {
179 pub(crate) inner: Arc<ClientInner>,
180}
181
182#[derive(Default)]
183pub(crate) struct ClientLocks {
184 /// Lock ensuring that only a single room may be marked as a DM at once.
185 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
186 /// explanation.
187 pub(crate) mark_as_dm_lock: Mutex<()>,
188
189 /// Lock ensuring that only a single secret store is getting opened at the
190 /// same time.
191 ///
192 /// This is important so we don't accidentally create multiple different new
193 /// default secret storage keys.
194 #[cfg(feature = "e2e-encryption")]
195 pub(crate) open_secret_store_lock: Mutex<()>,
196
197 /// Lock ensuring that we're only storing a single secret at a time.
198 ///
199 /// Take a look at the [`SecretStore::put_secret`] method for a more
200 /// detailed explanation.
201 ///
202 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
203 #[cfg(feature = "e2e-encryption")]
204 pub(crate) store_secret_lock: Mutex<()>,
205
206 /// Lock ensuring that only one method at a time might modify our backup.
207 #[cfg(feature = "e2e-encryption")]
208 pub(crate) backup_modify_lock: Mutex<()>,
209
210 /// Lock ensuring that we're going to attempt to upload backups for a single
211 /// requester.
212 #[cfg(feature = "e2e-encryption")]
213 pub(crate) backup_upload_lock: Mutex<()>,
214
215 /// Handler making sure we only have one group session sharing request in
216 /// flight per room.
217 #[cfg(feature = "e2e-encryption")]
218 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
219
220 /// Lock making sure we're only doing one key claim request at a time.
221 #[cfg(feature = "e2e-encryption")]
222 pub(crate) key_claim_lock: Mutex<()>,
223
224 /// Handler to ensure that only one members request is running at a time,
225 /// given a room.
226 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
227
228 /// Handler to ensure that only one encryption state request is running at a
229 /// time, given a room.
230 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
231
232 /// Deduplicating handler for sending read receipts. The string is an
233 /// internal implementation detail, see [`Self::send_single_receipt`].
234 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
235
236 #[cfg(feature = "e2e-encryption")]
237 pub(crate) cross_process_crypto_store_lock:
238 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
239
240 /// Latest "generation" of data known by the crypto store.
241 ///
242 /// This is a counter that only increments, set in the database (and can
243 /// wrap). It's incremented whenever some process acquires a lock for the
244 /// first time. *This assumes the crypto store lock is being held, to
245 /// avoid data races on writing to this value in the store*.
246 ///
247 /// The current process will maintain this value in local memory and in the
248 /// DB over time. Observing a different value than the one read in
249 /// memory, when reading from the store indicates that somebody else has
250 /// written into the database under our feet.
251 ///
252 /// TODO: this should live in the `OlmMachine`, since it's information
253 /// related to the lock. As of today (2023-07-28), we blow up the entire
254 /// olm machine when there's a generation mismatch. So storing the
255 /// generation in the olm machine would make the client think there's
256 /// *always* a mismatch, and that's why we need to store the generation
257 /// outside the `OlmMachine`.
258 #[cfg(feature = "e2e-encryption")]
259 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
260}
261
262pub(crate) struct ClientInner {
263 /// All the data related to authentication and authorization.
264 pub(crate) auth_ctx: Arc<AuthCtx>,
265
266 /// The URL of the server.
267 ///
268 /// Not to be confused with the `Self::homeserver`. `server` is usually
269 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
270 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
271 /// homeserver (at the time of writing — 2024-08-28).
272 ///
273 /// This value is optional depending on how the `Client` has been built.
274 /// If it's been built from a homeserver URL directly, we don't know the
275 /// server. However, if the `Client` has been built from a server URL or
276 /// name, then the homeserver has been discovered, and we know both.
277 server: Option<Url>,
278
279 /// The URL of the homeserver to connect to.
280 ///
281 /// This is the URL for the client-server Matrix API.
282 homeserver: StdRwLock<Url>,
283
284 /// The sliding sync version.
285 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
286
287 /// The underlying HTTP client.
288 pub(crate) http_client: HttpClient,
289
290 /// User session data.
291 pub(super) base_client: BaseClient,
292
293 /// Collection of in-memory caches for the [`Client`].
294 pub(crate) caches: ClientCaches,
295
296 /// Collection of locks individual client methods might want to use, either
297 /// to ensure that only a single call to a method happens at once or to
298 /// deduplicate multiple calls to a method.
299 pub(crate) locks: ClientLocks,
300
301 /// The cross-process store locks holder name.
302 ///
303 /// The SDK provides cross-process store locks (see
304 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
305 /// `holder_name` is the value used for all cross-process store locks
306 /// used by this `Client`.
307 ///
308 /// If multiple `Client`s are running in different processes, this
309 /// value MUST be different for each `Client`.
310 cross_process_store_locks_holder_name: String,
311
312 /// A mapping of the times at which the current user sent typing notices,
313 /// keyed by room.
314 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
315
316 /// Event handlers. See `add_event_handler`.
317 pub(crate) event_handlers: EventHandlerStore,
318
319 /// Notification handlers. See `register_notification_handler`.
320 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
321
322 /// The sender-side of channels used to receive room updates.
323 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
324
325 /// The sender-side of a channel used to observe all the room updates of a
326 /// sync response.
327 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
328
329 /// Whether the client should update its homeserver URL with the discovery
330 /// information present in the login response.
331 respect_login_well_known: bool,
332
333 /// An event that can be listened on to wait for a successful sync. The
334 /// event will only be fired if a sync loop is running. Can be used for
335 /// synchronization, e.g. if we send out a request to create a room, we can
336 /// wait for the sync to get the data to fetch a room object from the state
337 /// store.
338 pub(crate) sync_beat: event_listener::Event,
339
340 /// A central cache for events, inactive first.
341 ///
342 /// It becomes active when [`EventCache::subscribe`] is called.
343 pub(crate) event_cache: OnceCell<EventCache>,
344
345 /// End-to-end encryption related state.
346 #[cfg(feature = "e2e-encryption")]
347 pub(crate) e2ee: EncryptionData,
348
349 /// The verification state of our own device.
350 #[cfg(feature = "e2e-encryption")]
351 pub(crate) verification_state: SharedObservable<VerificationState>,
352
353 /// Whether to enable the experimental support for sending and receiving
354 /// encrypted room history on invite, per [MSC4268].
355 ///
356 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
357 #[cfg(feature = "e2e-encryption")]
358 pub(crate) enable_share_history_on_invite: bool,
359
360 /// Data related to the [`SendQueue`].
361 ///
362 /// [`SendQueue`]: crate::send_queue::SendQueue
363 pub(crate) send_queue_data: Arc<SendQueueData>,
364
365 /// The `max_upload_size` value of the homeserver, it contains the max
366 /// request size you can send.
367 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
368
369 /// The entry point to get the [`LatestEvent`] of rooms and threads.
370 ///
371 /// [`LatestEvent`]: crate::latest_event::LatestEvent
372 latest_events: OnceCell<LatestEvents>,
373
374 /// Service handling the catching up of thread subscriptions in the
375 /// background.
376 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
377
378 #[cfg(feature = "experimental-search")]
379 /// Handler for [`RoomIndex`]'s of each room
380 search_index: SearchIndex,
381}
382
383impl ClientInner {
384 /// Create a new `ClientInner`.
385 ///
386 /// All the fields passed as parameters here are those that must be cloned
387 /// upon instantiation of a sub-client, e.g. a client specialized for
388 /// notifications.
389 #[allow(clippy::too_many_arguments)]
390 async fn new(
391 auth_ctx: Arc<AuthCtx>,
392 server: Option<Url>,
393 homeserver: Url,
394 sliding_sync_version: SlidingSyncVersion,
395 http_client: HttpClient,
396 base_client: BaseClient,
397 server_info: ClientServerInfo,
398 respect_login_well_known: bool,
399 event_cache: OnceCell<EventCache>,
400 send_queue: Arc<SendQueueData>,
401 latest_events: OnceCell<LatestEvents>,
402 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
403 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
404 cross_process_store_locks_holder_name: String,
405 #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
406 thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
407 ) -> Arc<Self> {
408 let caches = ClientCaches {
409 server_info: server_info.into(),
410 server_metadata: Mutex::new(TtlCache::new()),
411 };
412
413 let client = Self {
414 server,
415 homeserver: StdRwLock::new(homeserver),
416 auth_ctx,
417 sliding_sync_version: StdRwLock::new(sliding_sync_version),
418 http_client,
419 base_client,
420 caches,
421 locks: Default::default(),
422 cross_process_store_locks_holder_name,
423 typing_notice_times: Default::default(),
424 event_handlers: Default::default(),
425 notification_handlers: Default::default(),
426 room_update_channels: Default::default(),
427 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
428 // ballast for all observers to catch up.
429 room_updates_sender: broadcast::Sender::new(32),
430 respect_login_well_known,
431 sync_beat: event_listener::Event::new(),
432 event_cache,
433 send_queue_data: send_queue,
434 latest_events,
435 #[cfg(feature = "e2e-encryption")]
436 e2ee: EncryptionData::new(encryption_settings),
437 #[cfg(feature = "e2e-encryption")]
438 verification_state: SharedObservable::new(VerificationState::Unknown),
439 #[cfg(feature = "e2e-encryption")]
440 enable_share_history_on_invite,
441 server_max_upload_size: Mutex::new(OnceCell::new()),
442 #[cfg(feature = "experimental-search")]
443 search_index: search_index_handler,
444 thread_subscription_catchup,
445 };
446
447 #[allow(clippy::let_and_return)]
448 let client = Arc::new(client);
449
450 #[cfg(feature = "e2e-encryption")]
451 client.e2ee.initialize_tasks(&client);
452
453 let _ = client
454 .event_cache
455 .get_or_init(|| async {
456 EventCache::new(
457 WeakClient::from_inner(&client),
458 client.base_client.event_cache_store().clone(),
459 )
460 })
461 .await;
462
463 let _ = client
464 .thread_subscription_catchup
465 .get_or_init(|| async {
466 ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
467 })
468 .await;
469
470 client
471 }
472}
473
474#[cfg(not(tarpaulin_include))]
475impl Debug for Client {
476 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
477 write!(fmt, "Client")
478 }
479}
480
481impl Client {
482 /// Create a new [`Client`] that will use the given homeserver.
483 ///
484 /// # Arguments
485 ///
486 /// * `homeserver_url` - The homeserver that the client should connect to.
487 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
488 Self::builder().homeserver_url(homeserver_url).build().await
489 }
490
491 /// Returns a subscriber that publishes an event every time the ignore user
492 /// list changes.
493 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
494 self.inner.base_client.subscribe_to_ignore_user_list_changes()
495 }
496
497 /// Create a new [`ClientBuilder`].
498 pub fn builder() -> ClientBuilder {
499 ClientBuilder::new()
500 }
501
502 pub(crate) fn base_client(&self) -> &BaseClient {
503 &self.inner.base_client
504 }
505
506 /// The underlying HTTP client.
507 pub fn http_client(&self) -> &reqwest::Client {
508 &self.inner.http_client.inner
509 }
510
511 pub(crate) fn locks(&self) -> &ClientLocks {
512 &self.inner.locks
513 }
514
515 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
516 &self.inner.auth_ctx
517 }
518
519 /// The cross-process store locks holder name.
520 ///
521 /// The SDK provides cross-process store locks (see
522 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
523 /// `holder_name` is the value used for all cross-process store locks
524 /// used by this `Client`.
525 pub fn cross_process_store_locks_holder_name(&self) -> &str {
526 &self.inner.cross_process_store_locks_holder_name
527 }
528
529 /// Change the homeserver URL used by this client.
530 ///
531 /// # Arguments
532 ///
533 /// * `homeserver_url` - The new URL to use.
534 fn set_homeserver(&self, homeserver_url: Url) {
535 *self.inner.homeserver.write().unwrap() = homeserver_url;
536 }
537
538 /// Get the capabilities of the homeserver.
539 ///
540 /// This method should be used to check what features are supported by the
541 /// homeserver.
542 ///
543 /// # Examples
544 ///
545 /// ```no_run
546 /// # use matrix_sdk::Client;
547 /// # use url::Url;
548 /// # async {
549 /// # let homeserver = Url::parse("http://example.com")?;
550 /// let client = Client::new(homeserver).await?;
551 ///
552 /// let capabilities = client.get_capabilities().await?;
553 ///
554 /// if capabilities.change_password.enabled {
555 /// // Change password
556 /// }
557 /// # anyhow::Ok(()) };
558 /// ```
559 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
560 let res = self.send(get_capabilities::v3::Request::new()).await?;
561 Ok(res.capabilities)
562 }
563
564 /// Get the server vendor information from the federation API.
565 ///
566 /// This method calls the `/_matrix/federation/v1/version` endpoint to get
567 /// both the server's software name and version.
568 ///
569 /// # Examples
570 ///
571 /// ```no_run
572 /// # use matrix_sdk::Client;
573 /// # use url::Url;
574 /// # async {
575 /// # let homeserver = Url::parse("http://example.com")?;
576 /// let client = Client::new(homeserver).await?;
577 ///
578 /// let server_info = client.server_vendor_info(None).await?;
579 /// println!(
580 /// "Server: {}, Version: {}",
581 /// server_info.server_name, server_info.version
582 /// );
583 /// # anyhow::Ok(()) };
584 /// ```
585 pub async fn server_vendor_info(
586 &self,
587 request_config: Option<RequestConfig>,
588 ) -> HttpResult<ServerVendorInfo> {
589 let res = self
590 .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
591 .await?;
592
593 // Extract server info, using defaults if fields are missing.
594 let server = res.server.unwrap_or_default();
595 let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
596 let version = server.version.unwrap_or_else(|| "unknown".to_owned());
597
598 Ok(ServerVendorInfo { server_name: server_name_str, version })
599 }
600
601 /// Get a copy of the default request config.
602 ///
603 /// The default request config is what's used when sending requests if no
604 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
605 /// function with such a parameter.
606 ///
607 /// If the default request config was not customized through
608 /// [`ClientBuilder`] when creating this `Client`, the returned value will
609 /// be equivalent to [`RequestConfig::default()`].
610 pub fn request_config(&self) -> RequestConfig {
611 self.inner.http_client.request_config
612 }
613
614 /// Check whether the client has been activated.
615 ///
616 /// A client is considered active when:
617 ///
618 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
619 /// is logged in,
620 /// 2. Has loaded cached data from storage,
621 /// 3. If encryption is enabled, it also initialized or restored its
622 /// `OlmMachine`.
623 pub fn is_active(&self) -> bool {
624 self.inner.base_client.is_active()
625 }
626
627 /// The server used by the client.
628 ///
629 /// See `Self::server` to learn more.
630 pub fn server(&self) -> Option<&Url> {
631 self.inner.server.as_ref()
632 }
633
634 /// The homeserver of the client.
635 pub fn homeserver(&self) -> Url {
636 self.inner.homeserver.read().unwrap().clone()
637 }
638
639 /// Get the sliding sync version.
640 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
641 self.inner.sliding_sync_version.read().unwrap().clone()
642 }
643
644 /// Override the sliding sync version.
645 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
646 let mut lock = self.inner.sliding_sync_version.write().unwrap();
647 *lock = version;
648 }
649
650 /// Get the Matrix user session meta information.
651 ///
652 /// If the client is currently logged in, this will return a
653 /// [`SessionMeta`] object which contains the user ID and device ID.
654 /// Otherwise it returns `None`.
655 pub fn session_meta(&self) -> Option<&SessionMeta> {
656 self.base_client().session_meta()
657 }
658
659 /// Returns a receiver that gets events for each room info update. To watch
660 /// for new events, use `receiver.resubscribe()`.
661 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
662 self.base_client().room_info_notable_update_receiver()
663 }
664
665 /// Performs a search for users.
666 /// The search is performed case-insensitively on user IDs and display names
667 ///
668 /// # Arguments
669 ///
670 /// * `search_term` - The search term for the search
671 /// * `limit` - The maximum number of results to return. Defaults to 10.
672 ///
673 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
674 pub async fn search_users(
675 &self,
676 search_term: &str,
677 limit: u64,
678 ) -> HttpResult<search_users::v3::Response> {
679 let mut request = search_users::v3::Request::new(search_term.to_owned());
680
681 if let Some(limit) = UInt::new(limit) {
682 request.limit = limit;
683 }
684
685 self.send(request).await
686 }
687
688 /// Get the user id of the current owner of the client.
689 pub fn user_id(&self) -> Option<&UserId> {
690 self.session_meta().map(|s| s.user_id.as_ref())
691 }
692
693 /// Get the device ID that identifies the current session.
694 pub fn device_id(&self) -> Option<&DeviceId> {
695 self.session_meta().map(|s| s.device_id.as_ref())
696 }
697
698 /// Get the current access token for this session.
699 ///
700 /// Will be `None` if the client has not been logged in.
701 pub fn access_token(&self) -> Option<String> {
702 self.auth_ctx().access_token()
703 }
704
705 /// Get the current tokens for this session.
706 ///
707 /// To be notified of changes in the session tokens, use
708 /// [`Client::subscribe_to_session_changes()`] or
709 /// [`Client::set_session_callbacks()`].
710 ///
711 /// Returns `None` if the client has not been logged in.
712 pub fn session_tokens(&self) -> Option<SessionTokens> {
713 self.auth_ctx().session_tokens()
714 }
715
716 /// Access the authentication API used to log in this client.
717 ///
718 /// Will be `None` if the client has not been logged in.
719 pub fn auth_api(&self) -> Option<AuthApi> {
720 match self.auth_ctx().auth_data.get()? {
721 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
722 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
723 }
724 }
725
726 /// Get the whole session info of this client.
727 ///
728 /// Will be `None` if the client has not been logged in.
729 ///
730 /// Can be used with [`Client::restore_session`] to restore a previously
731 /// logged-in session.
732 pub fn session(&self) -> Option<AuthSession> {
733 match self.auth_api()? {
734 AuthApi::Matrix(api) => api.session().map(Into::into),
735 AuthApi::OAuth(api) => api.full_session().map(Into::into),
736 }
737 }
738
739 /// Get a reference to the state store.
740 pub fn state_store(&self) -> &DynStateStore {
741 self.base_client().state_store()
742 }
743
744 /// Get a reference to the event cache store.
745 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
746 self.base_client().event_cache_store()
747 }
748
749 /// Access the native Matrix authentication API with this client.
750 pub fn matrix_auth(&self) -> MatrixAuth {
751 MatrixAuth::new(self.clone())
752 }
753
754 /// Get the account of the current owner of the client.
755 pub fn account(&self) -> Account {
756 Account::new(self.clone())
757 }
758
759 /// Get the encryption manager of the client.
760 #[cfg(feature = "e2e-encryption")]
761 pub fn encryption(&self) -> Encryption {
762 Encryption::new(self.clone())
763 }
764
765 /// Get the media manager of the client.
766 pub fn media(&self) -> Media {
767 Media::new(self.clone())
768 }
769
770 /// Get the pusher manager of the client.
771 pub fn pusher(&self) -> Pusher {
772 Pusher::new(self.clone())
773 }
774
775 /// Access the OAuth 2.0 API of the client.
776 pub fn oauth(&self) -> OAuth {
777 OAuth::new(self.clone())
778 }
779
780 /// Register a handler for a specific event type.
781 ///
782 /// The handler is a function or closure with one or more arguments. The
783 /// first argument is the event itself. All additional arguments are
784 /// "context" arguments: They have to implement [`EventHandlerContext`].
785 /// This trait is named that way because most of the types implementing it
786 /// give additional context about an event: The room it was in, its raw form
787 /// and other similar things. As two exceptions to this,
788 /// [`Client`] and [`EventHandlerHandle`] also implement the
789 /// `EventHandlerContext` trait so you don't have to clone your client
790 /// into the event handler manually and a handler can decide to remove
791 /// itself.
792 ///
793 /// Some context arguments are not universally applicable. A context
794 /// argument that isn't available for the given event type will result in
795 /// the event handler being skipped and an error being logged. The following
796 /// context argument types are only available for a subset of event types:
797 ///
798 /// * [`Room`] is only available for room-specific events, i.e. not for
799 /// events like global account data events or presence events.
800 ///
801 /// You can provide custom context via
802 /// [`add_event_handler_context`](Client::add_event_handler_context) and
803 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
804 /// into the event handler.
805 ///
806 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
807 ///
808 /// # Examples
809 ///
810 /// ```no_run
811 /// use matrix_sdk::{
812 /// deserialized_responses::EncryptionInfo,
813 /// event_handler::Ctx,
814 /// ruma::{
815 /// events::{
816 /// macros::EventContent,
817 /// push_rules::PushRulesEvent,
818 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
819 /// },
820 /// push::Action,
821 /// Int, MilliSecondsSinceUnixEpoch,
822 /// },
823 /// Client, Room,
824 /// };
825 /// use serde::{Deserialize, Serialize};
826 ///
827 /// # async fn example(client: Client) {
828 /// client.add_event_handler(
829 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
830 /// // Common usage: Room event plus room and client.
831 /// },
832 /// );
833 /// client.add_event_handler(
834 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
835 /// async move {
836 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
837 /// // unencrypted events and events that were decrypted by the SDK.
838 /// }
839 /// },
840 /// );
841 /// client.add_event_handler(
842 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
843 /// async move {
844 /// // A `Vec<Action>` parameter allows you to know which push actions
845 /// // are applicable for an event. For example, an event with
846 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
847 /// // in the timeline.
848 /// }
849 /// },
850 /// );
851 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
852 /// // You can omit any or all arguments after the first.
853 /// });
854 ///
855 /// // Registering a temporary event handler:
856 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
857 /// /* Event handler */
858 /// });
859 /// client.remove_event_handler(handle);
860 ///
861 /// // Registering custom event handler context:
862 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
863 /// struct MyContext {
864 /// number: usize,
865 /// }
866 /// client.add_event_handler_context(MyContext { number: 5 });
867 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
868 /// // Use the context
869 /// });
870 ///
871 /// // Custom events work exactly the same way, you just need to declare
872 /// // the content struct and use the EventContent derive macro on it.
873 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
874 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
875 /// struct TokenEventContent {
876 /// token: String,
877 /// #[serde(rename = "exp")]
878 /// expires_at: MilliSecondsSinceUnixEpoch,
879 /// }
880 ///
881 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
882 /// todo!("Display the token");
883 /// });
884 ///
885 /// // Event handler closures can also capture local variables.
886 /// // Make sure they are cheap to clone though, because they will be cloned
887 /// // every time the closure is called.
888 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
889 ///
890 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
891 /// println!("Calling the handler with identifier {data}");
892 /// });
893 /// # }
894 /// ```
895 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
896 where
897 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
898 H: EventHandler<Ev, Ctx>,
899 {
900 self.add_event_handler_impl(handler, None)
901 }
902
903 /// Register a handler for a specific room, and event type.
904 ///
905 /// This method works the same way as
906 /// [`add_event_handler`][Self::add_event_handler], except that the handler
907 /// will only be called for events in the room with the specified ID. See
908 /// that method for more details on event handler functions.
909 ///
910 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
911 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
912 /// your use case.
913 pub fn add_room_event_handler<Ev, Ctx, H>(
914 &self,
915 room_id: &RoomId,
916 handler: H,
917 ) -> EventHandlerHandle
918 where
919 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
920 H: EventHandler<Ev, Ctx>,
921 {
922 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
923 }
924
925 /// Observe a specific event type.
926 ///
927 /// `Ev` represents the kind of event that will be observed. `Ctx`
928 /// represents the context that will come with the event. It relies on the
929 /// same mechanism as [`Client::add_event_handler`]. The main difference is
930 /// that it returns an [`ObservableEventHandler`] and doesn't require a
931 /// user-defined closure. It is possible to subscribe to the
932 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
933 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
934 /// Ctx)`.
935 ///
936 /// Be careful that only the most recent value can be observed. Subscribers
937 /// are notified when a new value is sent, but there is no guarantee
938 /// that they will see all values.
939 ///
940 /// # Example
941 ///
942 /// Let's see a classical usage:
943 ///
944 /// ```
945 /// use futures_util::StreamExt as _;
946 /// use matrix_sdk::{
947 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
948 /// Client, Room,
949 /// };
950 ///
951 /// # async fn example(client: Client) -> Option<()> {
952 /// let observer =
953 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
954 ///
955 /// let mut subscriber = observer.subscribe();
956 ///
957 /// let (event, (room, push_actions)) = subscriber.next().await?;
958 /// # Some(())
959 /// # }
960 /// ```
961 ///
962 /// Now let's see how to get several contexts that can be useful for you:
963 ///
964 /// ```
965 /// use matrix_sdk::{
966 /// deserialized_responses::EncryptionInfo,
967 /// ruma::{
968 /// events::room::{
969 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
970 /// },
971 /// push::Action,
972 /// },
973 /// Client, Room,
974 /// };
975 ///
976 /// # async fn example(client: Client) {
977 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
978 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
979 ///
980 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
981 /// // to distinguish between unencrypted events and events that were decrypted
982 /// // by the SDK.
983 /// let _ = client
984 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
985 /// );
986 ///
987 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
988 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
989 /// // should be highlighted in the timeline.
990 /// let _ =
991 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
992 ///
993 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
994 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
995 /// # }
996 /// ```
997 ///
998 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
999 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1000 where
1001 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1002 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1003 {
1004 self.observe_room_events_impl(None)
1005 }
1006
1007 /// Observe a specific room, and event type.
1008 ///
1009 /// This method works the same way as [`Client::observe_events`], except
1010 /// that the observability will only be applied for events in the room with
1011 /// the specified ID. See that method for more details.
1012 ///
1013 /// Be careful that only the most recent value can be observed. Subscribers
1014 /// are notified when a new value is sent, but there is no guarantee
1015 /// that they will see all values.
1016 pub fn observe_room_events<Ev, Ctx>(
1017 &self,
1018 room_id: &RoomId,
1019 ) -> ObservableEventHandler<(Ev, Ctx)>
1020 where
1021 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1022 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1023 {
1024 self.observe_room_events_impl(Some(room_id.to_owned()))
1025 }
1026
1027 /// Shared implementation for `Client::observe_events` and
1028 /// `Client::observe_room_events`.
1029 fn observe_room_events_impl<Ev, Ctx>(
1030 &self,
1031 room_id: Option<OwnedRoomId>,
1032 ) -> ObservableEventHandler<(Ev, Ctx)>
1033 where
1034 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1035 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1036 {
1037 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1038 // new value.
1039 let shared_observable = SharedObservable::new(None);
1040
1041 ObservableEventHandler::new(
1042 shared_observable.clone(),
1043 self.event_handler_drop_guard(self.add_event_handler_impl(
1044 move |event: Ev, context: Ctx| {
1045 shared_observable.set(Some((event, context)));
1046
1047 ready(())
1048 },
1049 room_id,
1050 )),
1051 )
1052 }
1053
1054 /// Remove the event handler associated with the handle.
1055 ///
1056 /// Note that you **must not** call `remove_event_handler` from the
1057 /// non-async part of an event handler, that is:
1058 ///
1059 /// ```ignore
1060 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1061 /// // âš this will cause a deadlock âš
1062 /// client.remove_event_handler(handle);
1063 ///
1064 /// async move {
1065 /// // removing the event handler here is fine
1066 /// client.remove_event_handler(handle);
1067 /// }
1068 /// })
1069 /// ```
1070 ///
1071 /// Note also that handlers that remove themselves will still execute with
1072 /// events received in the same sync cycle.
1073 ///
1074 /// # Arguments
1075 ///
1076 /// `handle` - The [`EventHandlerHandle`] that is returned when
1077 /// registering the event handler with [`Client::add_event_handler`].
1078 ///
1079 /// # Examples
1080 ///
1081 /// ```no_run
1082 /// # use url::Url;
1083 /// # use tokio::sync::mpsc;
1084 /// #
1085 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1086 /// #
1087 /// use matrix_sdk::{
1088 /// event_handler::EventHandlerHandle,
1089 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
1090 /// };
1091 /// #
1092 /// # futures_executor::block_on(async {
1093 /// # let client = matrix_sdk::Client::builder()
1094 /// # .homeserver_url(homeserver)
1095 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1096 /// # .build()
1097 /// # .await
1098 /// # .unwrap();
1099 ///
1100 /// client.add_event_handler(
1101 /// |ev: SyncRoomMemberEvent,
1102 /// client: Client,
1103 /// handle: EventHandlerHandle| async move {
1104 /// // Common usage: Check arriving Event is the expected one
1105 /// println!("Expected RoomMemberEvent received!");
1106 /// client.remove_event_handler(handle);
1107 /// },
1108 /// );
1109 /// # });
1110 /// ```
1111 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1112 self.inner.event_handlers.remove(handle);
1113 }
1114
1115 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1116 /// the given handle.
1117 ///
1118 /// When the returned value is dropped, the event handler will be removed.
1119 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1120 EventHandlerDropGuard::new(handle, self.clone())
1121 }
1122
1123 /// Add an arbitrary value for use as event handler context.
1124 ///
1125 /// The value can be obtained in an event handler by adding an argument of
1126 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1127 ///
1128 /// If a value of the same type has been added before, it will be
1129 /// overwritten.
1130 ///
1131 /// # Examples
1132 ///
1133 /// ```no_run
1134 /// use matrix_sdk::{
1135 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1136 /// Room,
1137 /// };
1138 /// # #[derive(Clone)]
1139 /// # struct SomeType;
1140 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1141 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1142 /// # futures_executor::block_on(async {
1143 /// # let client = matrix_sdk::Client::builder()
1144 /// # .homeserver_url(homeserver)
1145 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1146 /// # .build()
1147 /// # .await
1148 /// # .unwrap();
1149 ///
1150 /// // Handle used to send messages to the UI part of the app
1151 /// let my_gui_handle: SomeType = obtain_gui_handle();
1152 ///
1153 /// client.add_event_handler_context(my_gui_handle.clone());
1154 /// client.add_event_handler(
1155 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1156 /// async move {
1157 /// // gui_handle.send(DisplayMessage { message: ev });
1158 /// }
1159 /// },
1160 /// );
1161 /// # });
1162 /// ```
1163 pub fn add_event_handler_context<T>(&self, ctx: T)
1164 where
1165 T: Clone + Send + Sync + 'static,
1166 {
1167 self.inner.event_handlers.add_context(ctx);
1168 }
1169
1170 /// Register a handler for a notification.
1171 ///
1172 /// Similar to [`Client::add_event_handler`], but only allows functions
1173 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1174 /// [`Client`] for now.
1175 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1176 where
1177 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1178 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1179 {
1180 self.inner.notification_handlers.write().await.push(Box::new(
1181 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1182 ));
1183
1184 self
1185 }
1186
1187 /// Subscribe to all updates for the room with the given ID.
1188 ///
1189 /// The returned receiver will receive a new message for each sync response
1190 /// that contains updates for that room.
1191 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1192 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1193 btree_map::Entry::Vacant(entry) => {
1194 let (tx, rx) = broadcast::channel(8);
1195 entry.insert(tx);
1196 rx
1197 }
1198 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1199 }
1200 }
1201
1202 /// Subscribe to all updates to all rooms, whenever any has been received in
1203 /// a sync response.
1204 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1205 self.inner.room_updates_sender.subscribe()
1206 }
1207
1208 pub(crate) async fn notification_handlers(
1209 &self,
1210 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1211 self.inner.notification_handlers.read().await
1212 }
1213
1214 /// Get all the rooms the client knows about.
1215 ///
1216 /// This will return the list of joined, invited, and left rooms.
1217 pub fn rooms(&self) -> Vec<Room> {
1218 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1219 }
1220
1221 /// Get all the rooms the client knows about, filtered by room state.
1222 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1223 self.base_client()
1224 .rooms_filtered(filter)
1225 .into_iter()
1226 .map(|room| Room::new(self.clone(), room))
1227 .collect()
1228 }
1229
1230 /// Get a stream of all the rooms, in addition to the existing rooms.
1231 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1232 let (rooms, stream) = self.base_client().rooms_stream();
1233
1234 let map_room = |room| Room::new(self.clone(), room);
1235
1236 (
1237 rooms.into_iter().map(map_room).collect(),
1238 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1239 )
1240 }
1241
1242 /// Returns the joined rooms this client knows about.
1243 pub fn joined_rooms(&self) -> Vec<Room> {
1244 self.rooms_filtered(RoomStateFilter::JOINED)
1245 }
1246
1247 /// Returns the invited rooms this client knows about.
1248 pub fn invited_rooms(&self) -> Vec<Room> {
1249 self.rooms_filtered(RoomStateFilter::INVITED)
1250 }
1251
1252 /// Returns the left rooms this client knows about.
1253 pub fn left_rooms(&self) -> Vec<Room> {
1254 self.rooms_filtered(RoomStateFilter::LEFT)
1255 }
1256
1257 /// Returns the joined space rooms this client knows about.
1258 pub fn joined_space_rooms(&self) -> Vec<Room> {
1259 self.base_client()
1260 .rooms_filtered(RoomStateFilter::JOINED)
1261 .into_iter()
1262 .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1263 .collect()
1264 }
1265
1266 /// Get a room with the given room id.
1267 ///
1268 /// # Arguments
1269 ///
1270 /// `room_id` - The unique id of the room that should be fetched.
1271 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1272 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1273 }
1274
1275 /// Gets the preview of a room, whether the current user has joined it or
1276 /// not.
1277 pub async fn get_room_preview(
1278 &self,
1279 room_or_alias_id: &RoomOrAliasId,
1280 via: Vec<OwnedServerName>,
1281 ) -> Result<RoomPreview> {
1282 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1283 Ok(room_id) => room_id.to_owned(),
1284 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1285 };
1286
1287 if let Some(room) = self.get_room(&room_id) {
1288 // The cached data can only be trusted if the room state is joined or
1289 // banned: for invite and knock rooms, no updates will be received
1290 // for the rooms after the invite/knock action took place so we may
1291 // have very out to date data for important fields such as
1292 // `join_rule`. For left rooms, the homeserver should return the latest info.
1293 match room.state() {
1294 RoomState::Joined | RoomState::Banned => {
1295 return Ok(RoomPreview::from_known_room(&room).await);
1296 }
1297 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1298 }
1299 }
1300
1301 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1302 }
1303
1304 /// Resolve a room alias to a room id and a list of servers which know
1305 /// about it.
1306 ///
1307 /// # Arguments
1308 ///
1309 /// `room_alias` - The room alias to be resolved.
1310 pub async fn resolve_room_alias(
1311 &self,
1312 room_alias: &RoomAliasId,
1313 ) -> HttpResult<get_alias::v3::Response> {
1314 let request = get_alias::v3::Request::new(room_alias.to_owned());
1315 self.send(request).await
1316 }
1317
1318 /// Checks if a room alias is not in use yet.
1319 ///
1320 /// Returns:
1321 /// - `Ok(true)` if the room alias is available.
1322 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1323 /// status code).
1324 /// - An `Err` otherwise.
1325 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1326 match self.resolve_room_alias(alias).await {
1327 // The room alias was resolved, so it's already in use.
1328 Ok(_) => Ok(false),
1329 Err(error) => {
1330 match error.client_api_error_kind() {
1331 // The room alias wasn't found, so it's available.
1332 Some(ErrorKind::NotFound) => Ok(true),
1333 _ => Err(error),
1334 }
1335 }
1336 }
1337 }
1338
1339 /// Adds a new room alias associated with a room to the room directory.
1340 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1341 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1342 self.send(request).await?;
1343 Ok(())
1344 }
1345
1346 /// Removes a room alias from the room directory.
1347 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1348 let request = delete_alias::v3::Request::new(alias.to_owned());
1349 self.send(request).await?;
1350 Ok(())
1351 }
1352
1353 /// Update the homeserver from the login response well-known if needed.
1354 ///
1355 /// # Arguments
1356 ///
1357 /// * `login_well_known` - The `well_known` field from a successful login
1358 /// response.
1359 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1360 if self.inner.respect_login_well_known {
1361 if let Some(well_known) = login_well_known {
1362 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1363 self.set_homeserver(homeserver);
1364 }
1365 }
1366 }
1367 }
1368
1369 /// Similar to [`Client::restore_session_with`], with
1370 /// [`RoomLoadSettings::default()`].
1371 ///
1372 /// # Panics
1373 ///
1374 /// Panics if a session was already restored or logged in.
1375 #[instrument(skip_all)]
1376 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1377 self.restore_session_with(session, RoomLoadSettings::default()).await
1378 }
1379
1380 /// Restore a session previously logged-in using one of the available
1381 /// authentication APIs. The number of rooms to restore is controlled by
1382 /// [`RoomLoadSettings`].
1383 ///
1384 /// See the documentation of the corresponding authentication API's
1385 /// `restore_session` method for more information.
1386 ///
1387 /// # Panics
1388 ///
1389 /// Panics if a session was already restored or logged in.
1390 #[instrument(skip_all)]
1391 pub async fn restore_session_with(
1392 &self,
1393 session: impl Into<AuthSession>,
1394 room_load_settings: RoomLoadSettings,
1395 ) -> Result<()> {
1396 let session = session.into();
1397 match session {
1398 AuthSession::Matrix(session) => {
1399 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1400 }
1401 AuthSession::OAuth(session) => {
1402 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1403 }
1404 }
1405 }
1406
1407 /// Refresh the access token using the authentication API used to log into
1408 /// this session.
1409 ///
1410 /// See the documentation of the authentication API's `refresh_access_token`
1411 /// method for more information.
1412 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1413 let Some(auth_api) = self.auth_api() else {
1414 return Err(RefreshTokenError::RefreshTokenRequired);
1415 };
1416
1417 match auth_api {
1418 AuthApi::Matrix(api) => {
1419 trace!("Token refresh: Using the homeserver.");
1420 Box::pin(api.refresh_access_token()).await?;
1421 }
1422 AuthApi::OAuth(api) => {
1423 trace!("Token refresh: Using OAuth 2.0.");
1424 Box::pin(api.refresh_access_token()).await?;
1425 }
1426 }
1427
1428 Ok(())
1429 }
1430
1431 /// Log out the current session using the proper authentication API.
1432 ///
1433 /// # Errors
1434 ///
1435 /// Returns an error if the session is not authenticated or if an error
1436 /// occurred while making the request to the server.
1437 pub async fn logout(&self) -> Result<(), Error> {
1438 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1439 match auth_api {
1440 AuthApi::Matrix(matrix_auth) => {
1441 matrix_auth.logout().await?;
1442 Ok(())
1443 }
1444 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1445 }
1446 }
1447
1448 /// Get or upload a sync filter.
1449 ///
1450 /// This method will either get a filter ID from the store or upload the
1451 /// filter definition to the homeserver and return the new filter ID.
1452 ///
1453 /// # Arguments
1454 ///
1455 /// * `filter_name` - The unique name of the filter, this name will be used
1456 /// locally to store and identify the filter ID returned by the server.
1457 ///
1458 /// * `definition` - The filter definition that should be uploaded to the
1459 /// server if no filter ID can be found in the store.
1460 ///
1461 /// # Examples
1462 ///
1463 /// ```no_run
1464 /// # use matrix_sdk::{
1465 /// # Client, config::SyncSettings,
1466 /// # ruma::api::client::{
1467 /// # filter::{
1468 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1469 /// # },
1470 /// # sync::sync_events::v3::Filter,
1471 /// # }
1472 /// # };
1473 /// # use url::Url;
1474 /// # async {
1475 /// # let homeserver = Url::parse("http://example.com").unwrap();
1476 /// # let client = Client::new(homeserver).await.unwrap();
1477 /// let mut filter = FilterDefinition::default();
1478 ///
1479 /// // Let's enable member lazy loading.
1480 /// filter.room.state.lazy_load_options =
1481 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1482 ///
1483 /// let filter_id = client
1484 /// .get_or_upload_filter("sync", filter)
1485 /// .await
1486 /// .unwrap();
1487 ///
1488 /// let sync_settings = SyncSettings::new()
1489 /// .filter(Filter::FilterId(filter_id));
1490 ///
1491 /// let response = client.sync_once(sync_settings).await.unwrap();
1492 /// # };
1493 #[instrument(skip(self, definition))]
1494 pub async fn get_or_upload_filter(
1495 &self,
1496 filter_name: &str,
1497 definition: FilterDefinition,
1498 ) -> Result<String> {
1499 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1500 debug!("Found filter locally");
1501 Ok(filter)
1502 } else {
1503 debug!("Didn't find filter locally");
1504 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1505 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1506 let response = self.send(request).await?;
1507
1508 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1509
1510 Ok(response.filter_id)
1511 }
1512 }
1513
1514 /// Prepare to join a room by ID, by getting the current details about it
1515 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1516 let room = self.get_room(room_id)?;
1517
1518 let inviter = match room.invite_details().await {
1519 Ok(details) => details.inviter,
1520 Err(Error::WrongRoomState(_)) => None,
1521 Err(e) => {
1522 warn!("Error fetching invite details for room: {e:?}");
1523 None
1524 }
1525 };
1526
1527 Some(PreJoinRoomInfo { inviter })
1528 }
1529
1530 /// Finish joining a room.
1531 ///
1532 /// If the room was an invite that should be marked as a DM, will include it
1533 /// in the DM event after creating the joined room.
1534 ///
1535 /// If encrypted history sharing is enabled, will check to see if we have a
1536 /// key bundle, and import it if so.
1537 ///
1538 /// # Arguments
1539 ///
1540 /// * `room_id` - The `RoomId` of the room that was joined.
1541 /// * `pre_join_room_info` - Information about the room before we joined.
1542 async fn finish_join_room(
1543 &self,
1544 room_id: &RoomId,
1545 pre_join_room_info: Option<PreJoinRoomInfo>,
1546 ) -> Result<Room> {
1547 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1548 room.state() == RoomState::Invited
1549 && room.is_direct().await.unwrap_or_else(|e| {
1550 warn!(%room_id, "is_direct() failed: {e}");
1551 false
1552 })
1553 } else {
1554 false
1555 };
1556
1557 let base_room = self
1558 .base_client()
1559 .room_joined(
1560 room_id,
1561 pre_join_room_info
1562 .as_ref()
1563 .and_then(|info| info.inviter.as_ref())
1564 .map(|i| i.user_id().to_owned()),
1565 )
1566 .await?;
1567 let room = Room::new(self.clone(), base_room);
1568
1569 if mark_as_dm {
1570 room.set_is_direct(true).await?;
1571 }
1572
1573 #[cfg(feature = "e2e-encryption")]
1574 if self.inner.enable_share_history_on_invite {
1575 if let Some(inviter) =
1576 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1577 {
1578 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1579 .await?;
1580 }
1581 }
1582
1583 // Suppress "unused variable" and "unused field" lints
1584 #[cfg(not(feature = "e2e-encryption"))]
1585 let _ = pre_join_room_info.map(|i| i.inviter);
1586
1587 Ok(room)
1588 }
1589
1590 /// Join a room by `RoomId`.
1591 ///
1592 /// Returns the `Room` in the joined state.
1593 ///
1594 /// # Arguments
1595 ///
1596 /// * `room_id` - The `RoomId` of the room to be joined.
1597 #[instrument(skip(self))]
1598 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1599 // See who invited us to this room, if anyone. Note we have to do this before
1600 // making the `/join` request, otherwise we could race against the sync.
1601 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1602
1603 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1604 let response = self.send(request).await?;
1605 self.finish_join_room(&response.room_id, pre_join_info).await
1606 }
1607
1608 /// Join a room by `RoomOrAliasId`.
1609 ///
1610 /// Returns the `Room` in the joined state.
1611 ///
1612 /// # Arguments
1613 ///
1614 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1615 /// alias looks like `#name:example.com`.
1616 /// * `server_names` - The server names to be used for resolving the alias,
1617 /// if needs be.
1618 pub async fn join_room_by_id_or_alias(
1619 &self,
1620 alias: &RoomOrAliasId,
1621 server_names: &[OwnedServerName],
1622 ) -> Result<Room> {
1623 let pre_join_info = {
1624 match alias.try_into() {
1625 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1626 Err(_) => {
1627 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1628 // responding to an invitation to the room, and therefore don't need to handle
1629 // things that happen as a result of invites.
1630 None
1631 }
1632 }
1633 };
1634 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1635 via: server_names.to_owned(),
1636 });
1637 let response = self.send(request).await?;
1638 self.finish_join_room(&response.room_id, pre_join_info).await
1639 }
1640
1641 /// Search the homeserver's directory of public rooms.
1642 ///
1643 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1644 /// a `get_public_rooms::Response`.
1645 ///
1646 /// # Arguments
1647 ///
1648 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1649 ///
1650 /// * `since` - Pagination token from a previous request.
1651 ///
1652 /// * `server` - The name of the server, if `None` the requested server is
1653 /// used.
1654 ///
1655 /// # Examples
1656 /// ```no_run
1657 /// use matrix_sdk::Client;
1658 /// # use url::Url;
1659 /// # let homeserver = Url::parse("http://example.com").unwrap();
1660 /// # let limit = Some(10);
1661 /// # let since = Some("since token");
1662 /// # let server = Some("servername.com".try_into().unwrap());
1663 /// # async {
1664 /// let mut client = Client::new(homeserver).await.unwrap();
1665 ///
1666 /// client.public_rooms(limit, since, server).await;
1667 /// # };
1668 /// ```
1669 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1670 pub async fn public_rooms(
1671 &self,
1672 limit: Option<u32>,
1673 since: Option<&str>,
1674 server: Option<&ServerName>,
1675 ) -> HttpResult<get_public_rooms::v3::Response> {
1676 let limit = limit.map(UInt::from);
1677
1678 let request = assign!(get_public_rooms::v3::Request::new(), {
1679 limit,
1680 since: since.map(ToOwned::to_owned),
1681 server: server.map(ToOwned::to_owned),
1682 });
1683 self.send(request).await
1684 }
1685
1686 /// Create a room with the given parameters.
1687 ///
1688 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1689 /// created room.
1690 ///
1691 /// If you want to create a direct message with one specific user, you can
1692 /// use [`create_dm`][Self::create_dm], which is more convenient than
1693 /// assembling the [`create_room::v3::Request`] yourself.
1694 ///
1695 /// If the `is_direct` field of the request is set to `true` and at least
1696 /// one user is invited, the room will be automatically added to the direct
1697 /// rooms in the account data.
1698 ///
1699 /// # Examples
1700 ///
1701 /// ```no_run
1702 /// use matrix_sdk::{
1703 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1704 /// Client,
1705 /// };
1706 /// # use url::Url;
1707 /// #
1708 /// # async {
1709 /// # let homeserver = Url::parse("http://example.com").unwrap();
1710 /// let request = CreateRoomRequest::new();
1711 /// let client = Client::new(homeserver).await.unwrap();
1712 /// assert!(client.create_room(request).await.is_ok());
1713 /// # };
1714 /// ```
1715 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1716 let invite = request.invite.clone();
1717 let is_direct_room = request.is_direct;
1718 let response = self.send(request).await?;
1719
1720 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1721
1722 let joined_room = Room::new(self.clone(), base_room);
1723
1724 if is_direct_room && !invite.is_empty() {
1725 if let Err(error) =
1726 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1727 {
1728 // FIXME: Retry in the background
1729 error!("Failed to mark room as DM: {error}");
1730 }
1731 }
1732
1733 Ok(joined_room)
1734 }
1735
1736 /// Create a DM room.
1737 ///
1738 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1739 /// given user being invited, the room marked `is_direct` and both the
1740 /// creator and invitee getting the default maximum power level.
1741 ///
1742 /// If the `e2e-encryption` feature is enabled, the room will also be
1743 /// encrypted.
1744 ///
1745 /// # Arguments
1746 ///
1747 /// * `user_id` - The ID of the user to create a DM for.
1748 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1749 #[cfg(feature = "e2e-encryption")]
1750 let initial_state =
1751 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1752 .to_raw_any()];
1753
1754 #[cfg(not(feature = "e2e-encryption"))]
1755 let initial_state = vec![];
1756
1757 let request = assign!(create_room::v3::Request::new(), {
1758 invite: vec![user_id.to_owned()],
1759 is_direct: true,
1760 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1761 initial_state,
1762 });
1763
1764 self.create_room(request).await
1765 }
1766
1767 /// Search the homeserver's directory for public rooms with a filter.
1768 ///
1769 /// # Arguments
1770 ///
1771 /// * `room_search` - The easiest way to create this request is using the
1772 /// `get_public_rooms_filtered::Request` itself.
1773 ///
1774 /// # Examples
1775 ///
1776 /// ```no_run
1777 /// # use url::Url;
1778 /// # use matrix_sdk::Client;
1779 /// # async {
1780 /// # let homeserver = Url::parse("http://example.com")?;
1781 /// use matrix_sdk::ruma::{
1782 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1783 /// };
1784 /// # let mut client = Client::new(homeserver).await?;
1785 ///
1786 /// let mut filter = Filter::new();
1787 /// filter.generic_search_term = Some("rust".to_owned());
1788 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1789 /// request.filter = filter;
1790 ///
1791 /// let response = client.public_rooms_filtered(request).await?;
1792 ///
1793 /// for room in response.chunk {
1794 /// println!("Found room {room:?}");
1795 /// }
1796 /// # anyhow::Ok(()) };
1797 /// ```
1798 pub async fn public_rooms_filtered(
1799 &self,
1800 request: get_public_rooms_filtered::v3::Request,
1801 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1802 self.send(request).await
1803 }
1804
1805 /// Send an arbitrary request to the server, without updating client state.
1806 ///
1807 /// **Warning:** Because this method *does not* update the client state, it
1808 /// is important to make sure that you account for this yourself, and
1809 /// use wrapper methods where available. This method should *only* be
1810 /// used if a wrapper method for the endpoint you'd like to use is not
1811 /// available.
1812 ///
1813 /// # Arguments
1814 ///
1815 /// * `request` - A filled out and valid request for the endpoint to be hit
1816 ///
1817 /// * `timeout` - An optional request timeout setting, this overrides the
1818 /// default request setting if one was set.
1819 ///
1820 /// # Examples
1821 ///
1822 /// ```no_run
1823 /// # use matrix_sdk::{Client, config::SyncSettings};
1824 /// # use url::Url;
1825 /// # async {
1826 /// # let homeserver = Url::parse("http://localhost:8080")?;
1827 /// # let mut client = Client::new(homeserver).await?;
1828 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1829 ///
1830 /// // First construct the request you want to make
1831 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1832 /// // for all available Endpoints
1833 /// let user_id = user_id!("@example:localhost").to_owned();
1834 /// let request = profile::get_profile::v3::Request::new(user_id);
1835 ///
1836 /// // Start the request using Client::send()
1837 /// let response = client.send(request).await?;
1838 ///
1839 /// // Check the corresponding Response struct to find out what types are
1840 /// // returned
1841 /// # anyhow::Ok(()) };
1842 /// ```
1843 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1844 where
1845 Request: OutgoingRequest + Clone + Debug,
1846 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1847 {
1848 SendRequest {
1849 client: self.clone(),
1850 request,
1851 config: None,
1852 send_progress: Default::default(),
1853 }
1854 }
1855
1856 pub(crate) async fn send_inner<Request>(
1857 &self,
1858 request: Request,
1859 config: Option<RequestConfig>,
1860 send_progress: SharedObservable<TransmissionProgress>,
1861 ) -> HttpResult<Request::IncomingResponse>
1862 where
1863 Request: OutgoingRequest + Debug,
1864 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1865 {
1866 let homeserver = self.homeserver().to_string();
1867 let access_token = self.access_token();
1868
1869 self.inner
1870 .http_client
1871 .send(
1872 request,
1873 config,
1874 homeserver,
1875 access_token.as_deref(),
1876 &self.supported_versions().await?,
1877 send_progress,
1878 )
1879 .await
1880 }
1881
1882 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1883 _ = self
1884 .inner
1885 .auth_ctx
1886 .session_change_sender
1887 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1888 }
1889
1890 /// Fetches server versions from network; no caching.
1891 pub async fn fetch_server_versions(
1892 &self,
1893 request_config: Option<RequestConfig>,
1894 ) -> HttpResult<get_supported_versions::Response> {
1895 let server_versions = self
1896 .inner
1897 .http_client
1898 .send(
1899 get_supported_versions::Request::new(),
1900 request_config,
1901 self.homeserver().to_string(),
1902 None,
1903 &SupportedVersions {
1904 versions: [MatrixVersion::V1_0].into(),
1905 features: Default::default(),
1906 },
1907 Default::default(),
1908 )
1909 .await?;
1910
1911 Ok(server_versions)
1912 }
1913
1914 /// Fetches client well_known from network; no caching.
1915 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1916 let server_url_string = self
1917 .server()
1918 .unwrap_or(
1919 // Sometimes people configure their well-known directly on the homeserver so use
1920 // this as a fallback when the server name is unknown.
1921 &self.homeserver(),
1922 )
1923 .to_string();
1924
1925 let well_known = self
1926 .inner
1927 .http_client
1928 .send(
1929 discover_homeserver::Request::new(),
1930 Some(RequestConfig::short_retry()),
1931 server_url_string,
1932 None,
1933 &SupportedVersions {
1934 versions: [MatrixVersion::V1_0].into(),
1935 features: Default::default(),
1936 },
1937 Default::default(),
1938 )
1939 .await;
1940
1941 match well_known {
1942 Ok(well_known) => Some(well_known),
1943 Err(http_error) => {
1944 // It is perfectly valid to not have a well-known file.
1945 // Maybe we should check for a specific error code to be sure?
1946 warn!("Failed to fetch client well-known: {http_error}");
1947 None
1948 }
1949 }
1950 }
1951
1952 /// Load server info from storage, or fetch them from network and cache
1953 /// them.
1954 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1955 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1956 Ok(Some(stored)) => {
1957 if let Some(server_info) =
1958 stored.into_server_info().and_then(|info| info.maybe_decode())
1959 {
1960 return Ok(server_info);
1961 }
1962 }
1963 Ok(None) => {
1964 // fallthrough: cache is empty
1965 }
1966 Err(err) => {
1967 warn!("error when loading cached server info: {err}");
1968 // fallthrough to network.
1969 }
1970 }
1971
1972 let server_versions = self.fetch_server_versions(None).await?;
1973 let well_known = self.fetch_client_well_known().await;
1974 let server_info = ServerInfo::new(
1975 server_versions.versions.clone(),
1976 server_versions.unstable_features.clone(),
1977 well_known.map(Into::into),
1978 );
1979
1980 // Attempt to cache the result in storage.
1981 {
1982 if let Err(err) = self
1983 .state_store()
1984 .set_kv_data(
1985 StateStoreDataKey::ServerInfo,
1986 StateStoreDataValue::ServerInfo(server_info.clone()),
1987 )
1988 .await
1989 {
1990 warn!("error when caching server info: {err}");
1991 }
1992 }
1993
1994 Ok(server_info)
1995 }
1996
1997 async fn get_or_load_and_cache_server_info<
1998 Value,
1999 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2000 >(
2001 &self,
2002 map: MapFunction,
2003 ) -> HttpResult<Value> {
2004 let server_info = &self.inner.caches.server_info;
2005 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2006 return Ok(val);
2007 }
2008
2009 let mut guarded_server_info = server_info.write().await;
2010 if let CachedValue::Cached(val) = map(&guarded_server_info) {
2011 return Ok(val);
2012 }
2013
2014 let server_info = self.load_or_fetch_server_info().await?;
2015
2016 // Fill both unstable features and server versions at once.
2017 let mut supported = server_info.supported_versions();
2018 if supported.versions.is_empty() {
2019 supported.versions = [MatrixVersion::V1_0].into();
2020 }
2021
2022 guarded_server_info.supported_versions = CachedValue::Cached(supported);
2023 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2024
2025 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2026 // fetch an optional property), the function will always return some.
2027 Ok(map(&guarded_server_info).unwrap_cached_value())
2028 }
2029
2030 /// Get the Matrix versions and features supported by the homeserver by
2031 /// fetching them from the server or the cache.
2032 ///
2033 /// This is equivalent to calling both [`Client::server_versions()`] and
2034 /// [`Client::unstable_features()`]. To always fetch the result from the
2035 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2036 /// and then `.as_supported_versions()` on the response.
2037 ///
2038 /// # Examples
2039 ///
2040 /// ```no_run
2041 /// use ruma::api::{FeatureFlag, MatrixVersion};
2042 /// # use matrix_sdk::{Client, config::SyncSettings};
2043 /// # use url::Url;
2044 /// # async {
2045 /// # let homeserver = Url::parse("http://localhost:8080")?;
2046 /// # let mut client = Client::new(homeserver).await?;
2047 ///
2048 /// let supported = client.supported_versions().await?;
2049 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2050 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2051 ///
2052 /// let msc_x_feature = FeatureFlag::from("msc_x");
2053 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2054 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2055 /// # anyhow::Ok(()) };
2056 /// ```
2057 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2058 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2059 .await
2060 }
2061
2062 /// Get the Matrix versions supported by the homeserver by fetching them
2063 /// from the server or the cache.
2064 ///
2065 /// # Examples
2066 ///
2067 /// ```no_run
2068 /// use ruma::api::MatrixVersion;
2069 /// # use matrix_sdk::{Client, config::SyncSettings};
2070 /// # use url::Url;
2071 /// # async {
2072 /// # let homeserver = Url::parse("http://localhost:8080")?;
2073 /// # let mut client = Client::new(homeserver).await?;
2074 ///
2075 /// let server_versions = client.server_versions().await?;
2076 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2077 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2078 /// # anyhow::Ok(()) };
2079 /// ```
2080 pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2081 self.get_or_load_and_cache_server_info(|server_info| {
2082 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2083 })
2084 .await
2085 }
2086
2087 /// Get the unstable features supported by the homeserver by fetching them
2088 /// from the server or the cache.
2089 ///
2090 /// # Examples
2091 ///
2092 /// ```no_run
2093 /// use matrix_sdk::ruma::api::FeatureFlag;
2094 /// # use matrix_sdk::{Client, config::SyncSettings};
2095 /// # use url::Url;
2096 /// # async {
2097 /// # let homeserver = Url::parse("http://localhost:8080")?;
2098 /// # let mut client = Client::new(homeserver).await?;
2099 ///
2100 /// let msc_x_feature = FeatureFlag::from("msc_x");
2101 /// let unstable_features = client.unstable_features().await?;
2102 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2103 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2104 /// # anyhow::Ok(()) };
2105 /// ```
2106 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2107 self.get_or_load_and_cache_server_info(|server_info| {
2108 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2109 })
2110 .await
2111 }
2112
2113 /// Get information about the homeserver's advertised RTC foci by fetching
2114 /// the well-known file from the server or the cache.
2115 ///
2116 /// # Examples
2117 /// ```no_run
2118 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2119 /// # use url::Url;
2120 /// # async {
2121 /// # let homeserver = Url::parse("http://localhost:8080")?;
2122 /// # let mut client = Client::new(homeserver).await?;
2123 /// let rtc_foci = client.rtc_foci().await?;
2124 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2125 /// RtcFocusInfo::LiveKit(info) => Some(info),
2126 /// _ => None,
2127 /// });
2128 /// if let Some(info) = default_livekit_focus_info {
2129 /// println!("Default LiveKit service URL: {}", info.service_url);
2130 /// }
2131 /// # anyhow::Ok(()) };
2132 /// ```
2133 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2134 let well_known = self
2135 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2136 .await?;
2137
2138 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2139 }
2140
2141 /// Empty the server version and unstable features cache.
2142 ///
2143 /// Since the SDK caches server info (versions, unstable features,
2144 /// well-known etc), it's possible to have a stale entry in the cache. This
2145 /// functions makes it possible to force reset it.
2146 pub async fn reset_server_info(&self) -> Result<()> {
2147 // Empty the in-memory caches.
2148 let mut guard = self.inner.caches.server_info.write().await;
2149 guard.supported_versions = CachedValue::NotSet;
2150
2151 // Empty the store cache.
2152 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2153 }
2154
2155 /// Check whether MSC 4028 is enabled on the homeserver.
2156 ///
2157 /// # Examples
2158 ///
2159 /// ```no_run
2160 /// # use matrix_sdk::{Client, config::SyncSettings};
2161 /// # use url::Url;
2162 /// # async {
2163 /// # let homeserver = Url::parse("http://localhost:8080")?;
2164 /// # let mut client = Client::new(homeserver).await?;
2165 /// let msc4028_enabled =
2166 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2167 /// # anyhow::Ok(()) };
2168 /// ```
2169 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2170 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2171 }
2172
2173 /// Get information of all our own devices.
2174 ///
2175 /// # Examples
2176 ///
2177 /// ```no_run
2178 /// # use matrix_sdk::{Client, config::SyncSettings};
2179 /// # use url::Url;
2180 /// # async {
2181 /// # let homeserver = Url::parse("http://localhost:8080")?;
2182 /// # let mut client = Client::new(homeserver).await?;
2183 /// let response = client.devices().await?;
2184 ///
2185 /// for device in response.devices {
2186 /// println!(
2187 /// "Device: {} {}",
2188 /// device.device_id,
2189 /// device.display_name.as_deref().unwrap_or("")
2190 /// );
2191 /// }
2192 /// # anyhow::Ok(()) };
2193 /// ```
2194 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2195 let request = get_devices::v3::Request::new();
2196
2197 self.send(request).await
2198 }
2199
2200 /// Delete the given devices from the server.
2201 ///
2202 /// # Arguments
2203 ///
2204 /// * `devices` - The list of devices that should be deleted from the
2205 /// server.
2206 ///
2207 /// * `auth_data` - This request requires user interactive auth, the first
2208 /// request needs to set this to `None` and will always fail with an
2209 /// `UiaaResponse`. The response will contain information for the
2210 /// interactive auth and the same request needs to be made but this time
2211 /// with some `auth_data` provided.
2212 ///
2213 /// ```no_run
2214 /// # use matrix_sdk::{
2215 /// # ruma::{api::client::uiaa, device_id},
2216 /// # Client, Error, config::SyncSettings,
2217 /// # };
2218 /// # use serde_json::json;
2219 /// # use url::Url;
2220 /// # use std::collections::BTreeMap;
2221 /// # async {
2222 /// # let homeserver = Url::parse("http://localhost:8080")?;
2223 /// # let mut client = Client::new(homeserver).await?;
2224 /// let devices = &[device_id!("DEVICEID").to_owned()];
2225 ///
2226 /// if let Err(e) = client.delete_devices(devices, None).await {
2227 /// if let Some(info) = e.as_uiaa_response() {
2228 /// let mut password = uiaa::Password::new(
2229 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2230 /// "wordpass".to_owned(),
2231 /// );
2232 /// password.session = info.session.clone();
2233 ///
2234 /// client
2235 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2236 /// .await?;
2237 /// }
2238 /// }
2239 /// # anyhow::Ok(()) };
2240 pub async fn delete_devices(
2241 &self,
2242 devices: &[OwnedDeviceId],
2243 auth_data: Option<uiaa::AuthData>,
2244 ) -> HttpResult<delete_devices::v3::Response> {
2245 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2246 request.auth = auth_data;
2247
2248 self.send(request).await
2249 }
2250
2251 /// Change the display name of a device owned by the current user.
2252 ///
2253 /// Returns a `update_device::Response` which specifies the result
2254 /// of the operation.
2255 ///
2256 /// # Arguments
2257 ///
2258 /// * `device_id` - The ID of the device to change the display name of.
2259 /// * `display_name` - The new display name to set.
2260 pub async fn rename_device(
2261 &self,
2262 device_id: &DeviceId,
2263 display_name: &str,
2264 ) -> HttpResult<update_device::v3::Response> {
2265 let mut request = update_device::v3::Request::new(device_id.to_owned());
2266 request.display_name = Some(display_name.to_owned());
2267
2268 self.send(request).await
2269 }
2270
2271 /// Synchronize the client's state with the latest state on the server.
2272 ///
2273 /// ## Syncing Events
2274 ///
2275 /// Messages or any other type of event need to be periodically fetched from
2276 /// the server, this is achieved by sending a `/sync` request to the server.
2277 ///
2278 /// The first sync is sent out without a [`token`]. The response of the
2279 /// first sync will contain a [`next_batch`] field which should then be
2280 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2281 /// don't receive the same events multiple times.
2282 ///
2283 /// ## Long Polling
2284 ///
2285 /// A sync should in the usual case always be in flight. The
2286 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2287 /// long the server will wait for new events before it will respond.
2288 /// The server will respond immediately if some new events arrive before the
2289 /// timeout has expired. If no changes arrive and the timeout expires an
2290 /// empty sync response will be sent to the client.
2291 ///
2292 /// This method of sending a request that may not receive a response
2293 /// immediately is called long polling.
2294 ///
2295 /// ## Filtering Events
2296 ///
2297 /// The number or type of messages and events that the client should receive
2298 /// from the server can be altered using a [`Filter`].
2299 ///
2300 /// Filters can be non-trivial and, since they will be sent with every sync
2301 /// request, they may take up a bunch of unnecessary bandwidth.
2302 ///
2303 /// Luckily filters can be uploaded to the server and reused using an unique
2304 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2305 /// method.
2306 ///
2307 /// # Arguments
2308 ///
2309 /// * `sync_settings` - Settings for the sync call, this allows us to set
2310 /// various options to configure the sync:
2311 /// * [`filter`] - To configure which events we receive and which get
2312 /// [filtered] by the server
2313 /// * [`timeout`] - To configure our [long polling] setup.
2314 /// * [`token`] - To tell the server which events we already received
2315 /// and where we wish to continue syncing.
2316 /// * [`full_state`] - To tell the server that we wish to receive all
2317 /// state events, regardless of our configured [`token`].
2318 /// * [`set_presence`] - To tell the server to set the presence and to
2319 /// which state.
2320 ///
2321 /// # Examples
2322 ///
2323 /// ```no_run
2324 /// # use url::Url;
2325 /// # async {
2326 /// # let homeserver = Url::parse("http://localhost:8080")?;
2327 /// # let username = "";
2328 /// # let password = "";
2329 /// use matrix_sdk::{
2330 /// config::SyncSettings,
2331 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2332 /// };
2333 ///
2334 /// let client = Client::new(homeserver).await?;
2335 /// client.matrix_auth().login_username(username, password).send().await?;
2336 ///
2337 /// // Sync once so we receive the client state and old messages.
2338 /// client.sync_once(SyncSettings::default()).await?;
2339 ///
2340 /// // Register our handler so we start responding once we receive a new
2341 /// // event.
2342 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2343 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2344 /// });
2345 ///
2346 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2347 /// // from our `sync_once()` call automatically.
2348 /// client.sync(SyncSettings::default()).await;
2349 /// # anyhow::Ok(()) };
2350 /// ```
2351 ///
2352 /// [`sync`]: #method.sync
2353 /// [`SyncSettings`]: crate::config::SyncSettings
2354 /// [`token`]: crate::config::SyncSettings#method.token
2355 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2356 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2357 /// [`set_presence`]: ruma::presence::PresenceState
2358 /// [`filter`]: crate::config::SyncSettings#method.filter
2359 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2360 /// [`next_batch`]: SyncResponse#structfield.next_batch
2361 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2362 /// [long polling]: #long-polling
2363 /// [filtered]: #filtering-events
2364 #[instrument(skip(self))]
2365 pub async fn sync_once(
2366 &self,
2367 sync_settings: crate::config::SyncSettings,
2368 ) -> Result<SyncResponse> {
2369 // The sync might not return for quite a while due to the timeout.
2370 // We'll see if there's anything crypto related to send out before we
2371 // sync, i.e. if we closed our client after a sync but before the
2372 // crypto requests were sent out.
2373 //
2374 // This will mostly be a no-op.
2375 #[cfg(feature = "e2e-encryption")]
2376 if let Err(e) = self.send_outgoing_requests().await {
2377 error!(error = ?e, "Error while sending outgoing E2EE requests");
2378 }
2379
2380 let token = match sync_settings.token {
2381 SyncToken::Specific(token) => Some(token),
2382 SyncToken::NoToken => None,
2383 SyncToken::ReusePrevious => self.sync_token().await,
2384 };
2385
2386 let request = assign!(sync_events::v3::Request::new(), {
2387 filter: sync_settings.filter.map(|f| *f),
2388 since: token,
2389 full_state: sync_settings.full_state,
2390 set_presence: sync_settings.set_presence,
2391 timeout: sync_settings.timeout,
2392 use_state_after: true,
2393 });
2394 let mut request_config = self.request_config();
2395 if let Some(timeout) = sync_settings.timeout {
2396 let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2397 request_config.timeout = Some(base_timeout + timeout);
2398 }
2399
2400 let response = self.send(request).with_request_config(request_config).await?;
2401 let next_batch = response.next_batch.clone();
2402 let response = self.process_sync(response).await?;
2403
2404 #[cfg(feature = "e2e-encryption")]
2405 if let Err(e) = self.send_outgoing_requests().await {
2406 error!(error = ?e, "Error while sending outgoing E2EE requests");
2407 }
2408
2409 self.inner.sync_beat.notify(usize::MAX);
2410
2411 Ok(SyncResponse::new(next_batch, response))
2412 }
2413
2414 /// Repeatedly synchronize the client state with the server.
2415 ///
2416 /// This method will only return on error, if cancellation is needed
2417 /// the method should be wrapped in a cancelable task or the
2418 /// [`Client::sync_with_callback`] method can be used or
2419 /// [`Client::sync_with_result_callback`] if you want to handle error
2420 /// cases in the loop, too.
2421 ///
2422 /// This method will internally call [`Client::sync_once`] in a loop.
2423 ///
2424 /// This method can be used with the [`Client::add_event_handler`]
2425 /// method to react to individual events. If you instead wish to handle
2426 /// events in a bulk manner the [`Client::sync_with_callback`],
2427 /// [`Client::sync_with_result_callback`] and
2428 /// [`Client::sync_stream`] methods can be used instead. Those methods
2429 /// repeatedly return the whole sync response.
2430 ///
2431 /// # Arguments
2432 ///
2433 /// * `sync_settings` - Settings for the sync call. *Note* that those
2434 /// settings will be only used for the first sync call. See the argument
2435 /// docs for [`Client::sync_once`] for more info.
2436 ///
2437 /// # Return
2438 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2439 /// up to the user of the API to check the error and decide whether the sync
2440 /// should continue or not.
2441 ///
2442 /// # Examples
2443 ///
2444 /// ```no_run
2445 /// # use url::Url;
2446 /// # async {
2447 /// # let homeserver = Url::parse("http://localhost:8080")?;
2448 /// # let username = "";
2449 /// # let password = "";
2450 /// use matrix_sdk::{
2451 /// config::SyncSettings,
2452 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2453 /// };
2454 ///
2455 /// let client = Client::new(homeserver).await?;
2456 /// client.matrix_auth().login_username(&username, &password).send().await?;
2457 ///
2458 /// // Register our handler so we start responding once we receive a new
2459 /// // event.
2460 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2461 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2462 /// });
2463 ///
2464 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2465 /// // automatically.
2466 /// client.sync(SyncSettings::default()).await?;
2467 /// # anyhow::Ok(()) };
2468 /// ```
2469 ///
2470 /// [argument docs]: #method.sync_once
2471 /// [`sync_with_callback`]: #method.sync_with_callback
2472 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2473 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2474 }
2475
2476 /// Repeatedly call sync to synchronize the client state with the server.
2477 ///
2478 /// # Arguments
2479 ///
2480 /// * `sync_settings` - Settings for the sync call. *Note* that those
2481 /// settings will be only used for the first sync call. See the argument
2482 /// docs for [`Client::sync_once`] for more info.
2483 ///
2484 /// * `callback` - A callback that will be called every time a successful
2485 /// response has been fetched from the server. The callback must return a
2486 /// boolean which signalizes if the method should stop syncing. If the
2487 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2488 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2489 ///
2490 /// # Return
2491 /// The sync runs until an error occurs or the
2492 /// callback indicates that the Loop should stop. If the callback asked for
2493 /// a regular stop, the result will be `Ok(())` otherwise the
2494 /// `Err(Error)` is returned.
2495 ///
2496 /// # Examples
2497 ///
2498 /// The following example demonstrates how to sync forever while sending all
2499 /// the interesting events through a mpsc channel to another thread e.g. a
2500 /// UI thread.
2501 ///
2502 /// ```no_run
2503 /// # use std::time::Duration;
2504 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2505 /// # use url::Url;
2506 /// # async {
2507 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2508 /// # let mut client = Client::new(homeserver).await.unwrap();
2509 ///
2510 /// use tokio::sync::mpsc::channel;
2511 ///
2512 /// let (tx, rx) = channel(100);
2513 ///
2514 /// let sync_channel = &tx;
2515 /// let sync_settings = SyncSettings::new()
2516 /// .timeout(Duration::from_secs(30));
2517 ///
2518 /// client
2519 /// .sync_with_callback(sync_settings, |response| async move {
2520 /// let channel = sync_channel;
2521 /// for (room_id, room) in response.rooms.joined {
2522 /// for event in room.timeline.events {
2523 /// channel.send(event).await.unwrap();
2524 /// }
2525 /// }
2526 ///
2527 /// LoopCtrl::Continue
2528 /// })
2529 /// .await;
2530 /// };
2531 /// ```
2532 #[instrument(skip_all)]
2533 pub async fn sync_with_callback<C>(
2534 &self,
2535 sync_settings: crate::config::SyncSettings,
2536 callback: impl Fn(SyncResponse) -> C,
2537 ) -> Result<(), Error>
2538 where
2539 C: Future<Output = LoopCtrl>,
2540 {
2541 self.sync_with_result_callback(sync_settings, |result| async {
2542 Ok(callback(result?).await)
2543 })
2544 .await
2545 }
2546
2547 /// Repeatedly call sync to synchronize the client state with the server.
2548 ///
2549 /// # Arguments
2550 ///
2551 /// * `sync_settings` - Settings for the sync call. *Note* that those
2552 /// settings will be only used for the first sync call. See the argument
2553 /// docs for [`Client::sync_once`] for more info.
2554 ///
2555 /// * `callback` - A callback that will be called every time after a
2556 /// response has been received, failure or not. The callback returns a
2557 /// `Result<LoopCtrl, Error>`, too. When returning
2558 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2559 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2560 /// function returns `Ok(())`. In case the callback can't handle the
2561 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2562 /// which results in the sync ending and the `Err(Error)` being returned.
2563 ///
2564 /// # Return
2565 /// The sync runs until an error occurs that the callback can't handle or
2566 /// the callback indicates that the Loop should stop. If the callback
2567 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2568 /// `Err(Error)` is returned.
2569 ///
2570 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2571 /// this, and are handled first without sending the result to the
2572 /// callback. Only after they have exceeded is the `Result` handed to
2573 /// the callback.
2574 ///
2575 /// # Examples
2576 ///
2577 /// The following example demonstrates how to sync forever while sending all
2578 /// the interesting events through a mpsc channel to another thread e.g. a
2579 /// UI thread.
2580 ///
2581 /// ```no_run
2582 /// # use std::time::Duration;
2583 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2584 /// # use url::Url;
2585 /// # async {
2586 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2587 /// # let mut client = Client::new(homeserver).await.unwrap();
2588 /// #
2589 /// use tokio::sync::mpsc::channel;
2590 ///
2591 /// let (tx, rx) = channel(100);
2592 ///
2593 /// let sync_channel = &tx;
2594 /// let sync_settings = SyncSettings::new()
2595 /// .timeout(Duration::from_secs(30));
2596 ///
2597 /// client
2598 /// .sync_with_result_callback(sync_settings, |response| async move {
2599 /// let channel = sync_channel;
2600 /// let sync_response = response?;
2601 /// for (room_id, room) in sync_response.rooms.joined {
2602 /// for event in room.timeline.events {
2603 /// channel.send(event).await.unwrap();
2604 /// }
2605 /// }
2606 ///
2607 /// Ok(LoopCtrl::Continue)
2608 /// })
2609 /// .await;
2610 /// };
2611 /// ```
2612 #[instrument(skip(self, callback))]
2613 pub async fn sync_with_result_callback<C>(
2614 &self,
2615 sync_settings: crate::config::SyncSettings,
2616 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2617 ) -> Result<(), Error>
2618 where
2619 C: Future<Output = Result<LoopCtrl, Error>>,
2620 {
2621 let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2622
2623 while let Some(result) = sync_stream.next().await {
2624 trace!("Running callback");
2625 if callback(result).await? == LoopCtrl::Break {
2626 trace!("Callback told us to stop");
2627 break;
2628 }
2629 trace!("Done running callback");
2630 }
2631
2632 Ok(())
2633 }
2634
2635 //// Repeatedly synchronize the client state with the server.
2636 ///
2637 /// This method will internally call [`Client::sync_once`] in a loop and is
2638 /// equivalent to the [`Client::sync`] method but the responses are provided
2639 /// as an async stream.
2640 ///
2641 /// # Arguments
2642 ///
2643 /// * `sync_settings` - Settings for the sync call. *Note* that those
2644 /// settings will be only used for the first sync call. See the argument
2645 /// docs for [`Client::sync_once`] for more info.
2646 ///
2647 /// # Examples
2648 ///
2649 /// ```no_run
2650 /// # use url::Url;
2651 /// # async {
2652 /// # let homeserver = Url::parse("http://localhost:8080")?;
2653 /// # let username = "";
2654 /// # let password = "";
2655 /// use futures_util::StreamExt;
2656 /// use matrix_sdk::{config::SyncSettings, Client};
2657 ///
2658 /// let client = Client::new(homeserver).await?;
2659 /// client.matrix_auth().login_username(&username, &password).send().await?;
2660 ///
2661 /// let mut sync_stream =
2662 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2663 ///
2664 /// while let Some(Ok(response)) = sync_stream.next().await {
2665 /// for room in response.rooms.joined.values() {
2666 /// for e in &room.timeline.events {
2667 /// if let Ok(event) = e.raw().deserialize() {
2668 /// println!("Received event {:?}", event);
2669 /// }
2670 /// }
2671 /// }
2672 /// }
2673 ///
2674 /// # anyhow::Ok(()) };
2675 /// ```
2676 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2677 #[instrument(skip(self))]
2678 pub async fn sync_stream(
2679 &self,
2680 mut sync_settings: crate::config::SyncSettings,
2681 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2682 let mut is_first_sync = true;
2683 let mut timeout = None;
2684 let mut last_sync_time: Option<Instant> = None;
2685
2686 let parent_span = Span::current();
2687
2688 async_stream::stream!({
2689 loop {
2690 trace!("Syncing");
2691
2692 if sync_settings.ignore_timeout_on_first_sync {
2693 if is_first_sync {
2694 timeout = sync_settings.timeout.take();
2695 } else if sync_settings.timeout.is_none() && timeout.is_some() {
2696 sync_settings.timeout = timeout.take();
2697 }
2698
2699 is_first_sync = false;
2700 }
2701
2702 yield self
2703 .sync_loop_helper(&mut sync_settings)
2704 .instrument(parent_span.clone())
2705 .await;
2706
2707 Client::delay_sync(&mut last_sync_time).await
2708 }
2709 })
2710 }
2711
2712 /// Get the current, if any, sync token of the client.
2713 /// This will be None if the client didn't sync at least once.
2714 pub(crate) async fn sync_token(&self) -> Option<String> {
2715 self.inner.base_client.sync_token().await
2716 }
2717
2718 /// Gets information about the owner of a given access token.
2719 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2720 let request = whoami::v3::Request::new();
2721 self.send(request).await
2722 }
2723
2724 /// Subscribes a new receiver to client SessionChange broadcasts.
2725 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2726 let broadcast = &self.auth_ctx().session_change_sender;
2727 broadcast.subscribe()
2728 }
2729
2730 /// Sets the save/restore session callbacks.
2731 ///
2732 /// This is another mechanism to get synchronous updates to session tokens,
2733 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2734 pub fn set_session_callbacks(
2735 &self,
2736 reload_session_callback: Box<ReloadSessionCallback>,
2737 save_session_callback: Box<SaveSessionCallback>,
2738 ) -> Result<()> {
2739 self.inner
2740 .auth_ctx
2741 .reload_session_callback
2742 .set(reload_session_callback)
2743 .map_err(|_| Error::MultipleSessionCallbacks)?;
2744
2745 self.inner
2746 .auth_ctx
2747 .save_session_callback
2748 .set(save_session_callback)
2749 .map_err(|_| Error::MultipleSessionCallbacks)?;
2750
2751 Ok(())
2752 }
2753
2754 /// Get the notification settings of the current owner of the client.
2755 pub async fn notification_settings(&self) -> NotificationSettings {
2756 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2757 NotificationSettings::new(self.clone(), ruleset)
2758 }
2759
2760 /// Create a new specialized `Client` that can process notifications.
2761 ///
2762 /// See [`CrossProcessStoreLock::new`] to learn more about
2763 /// `cross_process_store_locks_holder_name`.
2764 ///
2765 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2766 pub async fn notification_client(
2767 &self,
2768 cross_process_store_locks_holder_name: String,
2769 ) -> Result<Client> {
2770 let client = Client {
2771 inner: ClientInner::new(
2772 self.inner.auth_ctx.clone(),
2773 self.server().cloned(),
2774 self.homeserver(),
2775 self.sliding_sync_version(),
2776 self.inner.http_client.clone(),
2777 self.inner
2778 .base_client
2779 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2780 .await?,
2781 self.inner.caches.server_info.read().await.clone(),
2782 self.inner.respect_login_well_known,
2783 self.inner.event_cache.clone(),
2784 self.inner.send_queue_data.clone(),
2785 self.inner.latest_events.clone(),
2786 #[cfg(feature = "e2e-encryption")]
2787 self.inner.e2ee.encryption_settings,
2788 #[cfg(feature = "e2e-encryption")]
2789 self.inner.enable_share_history_on_invite,
2790 cross_process_store_locks_holder_name,
2791 #[cfg(feature = "experimental-search")]
2792 self.inner.search_index.clone(),
2793 self.inner.thread_subscription_catchup.clone(),
2794 )
2795 .await,
2796 };
2797
2798 Ok(client)
2799 }
2800
2801 /// The [`EventCache`] instance for this [`Client`].
2802 pub fn event_cache(&self) -> &EventCache {
2803 // SAFETY: always initialized in the `Client` ctor.
2804 self.inner.event_cache.get().unwrap()
2805 }
2806
2807 /// The [`LatestEvents`] instance for this [`Client`].
2808 pub async fn latest_events(&self) -> &LatestEvents {
2809 self.inner
2810 .latest_events
2811 .get_or_init(|| async {
2812 LatestEvents::new(
2813 WeakClient::from_client(self),
2814 self.event_cache().clone(),
2815 SendQueue::new(self.clone()),
2816 )
2817 })
2818 .await
2819 }
2820
2821 /// Waits until an at least partially synced room is received, and returns
2822 /// it.
2823 ///
2824 /// **Note: this function will loop endlessly until either it finds the room
2825 /// or an externally set timeout happens.**
2826 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2827 loop {
2828 if let Some(room) = self.get_room(room_id) {
2829 if room.is_state_partially_or_fully_synced() {
2830 debug!("Found just created room!");
2831 return room;
2832 }
2833 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2834 } else {
2835 debug!("Room wasn't found, waiting for sync beat to try again");
2836 }
2837 self.inner.sync_beat.listen().await;
2838 }
2839 }
2840
2841 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2842 /// join it.
2843 pub async fn knock(
2844 &self,
2845 room_id_or_alias: OwnedRoomOrAliasId,
2846 reason: Option<String>,
2847 server_names: Vec<OwnedServerName>,
2848 ) -> Result<Room> {
2849 let request =
2850 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2851 let response = self.send(request).await?;
2852 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2853 Ok(Room::new(self.clone(), base_room))
2854 }
2855
2856 /// Checks whether the provided `user_id` belongs to an ignored user.
2857 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2858 self.base_client().is_user_ignored(user_id).await
2859 }
2860
2861 /// Gets the `max_upload_size` value from the homeserver, getting either a
2862 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2863 /// missing.
2864 ///
2865 /// Check the spec for more info:
2866 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2867 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2868 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2869 if let Some(data) = max_upload_size_lock.get() {
2870 return Ok(data.to_owned());
2871 }
2872
2873 // Use the authenticated endpoint when the server supports it.
2874 let supported_versions = self.supported_versions().await?;
2875 let use_auth =
2876 authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2877
2878 let upload_size = if use_auth {
2879 self.send(authenticated_media::get_media_config::v1::Request::default())
2880 .await?
2881 .upload_size
2882 } else {
2883 #[allow(deprecated)]
2884 self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2885 };
2886
2887 match max_upload_size_lock.set(upload_size) {
2888 Ok(_) => Ok(upload_size),
2889 Err(error) => {
2890 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2891 }
2892 }
2893 }
2894
2895 /// The settings to use for decrypting events.
2896 #[cfg(feature = "e2e-encryption")]
2897 pub fn decryption_settings(&self) -> &DecryptionSettings {
2898 &self.base_client().decryption_settings
2899 }
2900
2901 #[cfg(feature = "experimental-search")]
2902 pub(crate) fn search_index(&self) -> &SearchIndex {
2903 &self.inner.search_index
2904 }
2905
2906 /// Whether the client is configured to take thread subscriptions (MSC4306
2907 /// and MSC4308) into account.
2908 ///
2909 /// This may cause filtering out of thread subscriptions, and loading the
2910 /// thread subscriptions via the sliding sync extension, when the room
2911 /// list service is being used.
2912 pub fn enabled_thread_subscriptions(&self) -> bool {
2913 match self.base_client().threading_support {
2914 ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2915 ThreadingSupport::Disabled => false,
2916 }
2917 }
2918
2919 /// Fetch thread subscriptions changes between `from` and up to `to`.
2920 ///
2921 /// The `limit` optional parameter can be used to limit the number of
2922 /// entries in a response. It can also be overridden by the server, if
2923 /// it's deemed too large.
2924 pub async fn fetch_thread_subscriptions(
2925 &self,
2926 from: Option<String>,
2927 to: Option<String>,
2928 limit: Option<UInt>,
2929 ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2930 let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2931 from,
2932 to,
2933 limit,
2934 });
2935 Ok(self.send(request).await?)
2936 }
2937
2938 pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2939 self.inner.thread_subscription_catchup.get().unwrap()
2940 }
2941}
2942
2943#[cfg(any(feature = "testing", test))]
2944impl Client {
2945 /// Test helper to mark users as tracked by the crypto layer.
2946 #[cfg(feature = "e2e-encryption")]
2947 pub async fn update_tracked_users_for_testing(
2948 &self,
2949 user_ids: impl IntoIterator<Item = &UserId>,
2950 ) {
2951 let olm = self.olm_machine().await;
2952 let olm = olm.as_ref().unwrap();
2953 olm.update_tracked_users(user_ids).await.unwrap();
2954 }
2955}
2956
2957/// A weak reference to the inner client, useful when trying to get a handle
2958/// on the owning client.
2959#[derive(Clone, Debug)]
2960pub(crate) struct WeakClient {
2961 client: Weak<ClientInner>,
2962}
2963
2964impl WeakClient {
2965 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2966 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2967 Self { client: Arc::downgrade(client) }
2968 }
2969
2970 /// Construct a [`WeakClient`] from a [`Client`].
2971 pub fn from_client(client: &Client) -> Self {
2972 Self::from_inner(&client.inner)
2973 }
2974
2975 /// Attempts to get a [`Client`] from this [`WeakClient`].
2976 pub fn get(&self) -> Option<Client> {
2977 self.client.upgrade().map(|inner| Client { inner })
2978 }
2979
2980 /// Gets the number of strong (`Arc`) pointers still pointing to this
2981 /// client.
2982 #[allow(dead_code)]
2983 pub fn strong_count(&self) -> usize {
2984 self.client.strong_count()
2985 }
2986}
2987
2988#[derive(Clone)]
2989struct ClientServerInfo {
2990 /// The Matrix versions and unstable features the server supports (known
2991 /// ones only).
2992 supported_versions: CachedValue<SupportedVersions>,
2993
2994 /// The server's well-known file, if any.
2995 well_known: CachedValue<Option<WellKnownResponse>>,
2996}
2997
2998/// A cached value that can either be set or not set, used to avoid confusion
2999/// between a value that is set to `None` (because it doesn't exist) and a value
3000/// that has not been cached yet.
3001#[derive(Clone)]
3002enum CachedValue<Value> {
3003 /// A value has been cached.
3004 Cached(Value),
3005 /// Nothing has been cached yet.
3006 NotSet,
3007}
3008
3009impl<Value> CachedValue<Value> {
3010 /// Unwraps the cached value, returning it if it exists.
3011 ///
3012 /// # Panics
3013 ///
3014 /// If the cached value is not set, this will panic.
3015 fn unwrap_cached_value(self) -> Value {
3016 match self {
3017 CachedValue::Cached(value) => value,
3018 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3019 }
3020 }
3021
3022 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3023 fn as_ref(&self) -> CachedValue<&Value> {
3024 match self {
3025 Self::Cached(value) => CachedValue::Cached(value),
3026 Self::NotSet => CachedValue::NotSet,
3027 }
3028 }
3029
3030 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3031 /// function to a contained value (if `Cached`) or returns `NotSet` (if
3032 /// `NotSet`).
3033 fn map<Other, F>(self, f: F) -> CachedValue<Other>
3034 where
3035 F: FnOnce(Value) -> Other,
3036 {
3037 match self {
3038 Self::Cached(value) => CachedValue::Cached(f(value)),
3039 Self::NotSet => CachedValue::NotSet,
3040 }
3041 }
3042}
3043
3044/// Information about the state of a room before we joined it.
3045#[derive(Debug, Clone, Default)]
3046struct PreJoinRoomInfo {
3047 /// The user who invited us to the room, if any.
3048 pub inviter: Option<RoomMember>,
3049}
3050
3051// The http mocking library is not supported for wasm32
3052#[cfg(all(test, not(target_family = "wasm")))]
3053pub(crate) mod tests {
3054 use std::{sync::Arc, time::Duration};
3055
3056 use assert_matches::assert_matches;
3057 use assert_matches2::assert_let;
3058 use eyeball::SharedObservable;
3059 use futures_util::{pin_mut, FutureExt, StreamExt};
3060 use js_int::{uint, UInt};
3061 use matrix_sdk_base::{
3062 store::{MemoryStore, StoreConfig},
3063 RoomState,
3064 };
3065 use matrix_sdk_test::{
3066 async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
3067 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
3068 };
3069 #[cfg(target_family = "wasm")]
3070 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3071
3072 use ruma::{
3073 api::{
3074 client::{
3075 discovery::discover_homeserver::RtcFocusInfo,
3076 room::create_room::v3::Request as CreateRoomRequest,
3077 },
3078 FeatureFlag, MatrixVersion,
3079 },
3080 assign,
3081 events::{
3082 ignored_user_list::IgnoredUserListEventContent,
3083 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3084 },
3085 owned_room_id, owned_user_id, room_alias_id, room_id, RoomId, ServerName, UserId,
3086 };
3087 use serde_json::json;
3088 use stream_assert::{assert_next_matches, assert_pending};
3089 use tokio::{
3090 spawn,
3091 time::{sleep, timeout},
3092 };
3093 use url::Url;
3094
3095 use super::Client;
3096 use crate::{
3097 client::{futures::SendMediaUploadRequest, WeakClient},
3098 config::{RequestConfig, SyncSettings},
3099 futures::SendRequest,
3100 media::MediaError,
3101 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3102 Error, TransmissionProgress,
3103 };
3104
3105 #[async_test]
3106 async fn test_account_data() {
3107 let server = MatrixMockServer::new().await;
3108 let client = server.client_builder().build().await;
3109
3110 let f = EventFactory::new();
3111 server
3112 .mock_sync()
3113 .ok_and_run(&client, |builder| {
3114 builder.add_global_account_data(
3115 f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3116 );
3117 })
3118 .await;
3119
3120 let content = client
3121 .account()
3122 .account_data::<IgnoredUserListEventContent>()
3123 .await
3124 .unwrap()
3125 .unwrap()
3126 .deserialize()
3127 .unwrap();
3128
3129 assert_eq!(content.ignored_users.len(), 1);
3130 }
3131
3132 #[async_test]
3133 async fn test_successful_discovery() {
3134 // Imagine this is `matrix.org`.
3135 let server = MatrixMockServer::new().await;
3136 let server_url = server.uri();
3137
3138 // Imagine this is `matrix-client.matrix.org`.
3139 let homeserver = MatrixMockServer::new().await;
3140 let homeserver_url = homeserver.uri();
3141
3142 // Imagine Alice has the user ID `@alice:matrix.org`.
3143 let domain = server_url.strip_prefix("http://").unwrap();
3144 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3145
3146 // The `.well-known` is on the server (e.g. `matrix.org`).
3147 server
3148 .mock_well_known()
3149 .ok_with_homeserver_url(&homeserver_url)
3150 .mock_once()
3151 .named("well-known")
3152 .mount()
3153 .await;
3154
3155 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3156 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3157
3158 let client = Client::builder()
3159 .insecure_server_name_no_tls(alice.server_name())
3160 .build()
3161 .await
3162 .unwrap();
3163
3164 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3165 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3166 client.server_versions().await.unwrap();
3167 }
3168
3169 #[async_test]
3170 async fn test_discovery_broken_server() {
3171 let server = MatrixMockServer::new().await;
3172 let server_url = server.uri();
3173 let domain = server_url.strip_prefix("http://").unwrap();
3174 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3175
3176 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3177
3178 assert!(
3179 Client::builder()
3180 .insecure_server_name_no_tls(alice.server_name())
3181 .build()
3182 .await
3183 .is_err(),
3184 "Creating a client from a user ID should fail when the .well-known request fails."
3185 );
3186 }
3187
3188 #[async_test]
3189 async fn test_room_creation() {
3190 let server = MatrixMockServer::new().await;
3191 let client = server.client_builder().build().await;
3192
3193 server
3194 .mock_sync()
3195 .ok_and_run(&client, |builder| {
3196 builder.add_joined_room(
3197 JoinedRoomBuilder::default()
3198 .add_state_event(StateTestEvent::Member)
3199 .add_state_event(StateTestEvent::PowerLevels),
3200 );
3201 })
3202 .await;
3203
3204 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3205 assert_eq!(room.state(), RoomState::Joined);
3206 }
3207
3208 #[async_test]
3209 async fn test_retry_limit_http_requests() {
3210 let server = MatrixMockServer::new().await;
3211 let client = server
3212 .client_builder()
3213 .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3214 .build()
3215 .await;
3216
3217 assert!(client.request_config().retry_limit.unwrap() == 3);
3218
3219 server.mock_login().error500().expect(3).mount().await;
3220
3221 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3222 }
3223
3224 #[async_test]
3225 async fn test_retry_timeout_http_requests() {
3226 // Keep this timeout small so that the test doesn't take long
3227 let retry_timeout = Duration::from_secs(5);
3228 let server = MatrixMockServer::new().await;
3229 let client = server
3230 .client_builder()
3231 .on_builder(|builder| {
3232 builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3233 })
3234 .build()
3235 .await;
3236
3237 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3238
3239 server.mock_login().error500().expect(2..).mount().await;
3240
3241 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3242 }
3243
3244 #[async_test]
3245 async fn test_short_retry_initial_http_requests() {
3246 let server = MatrixMockServer::new().await;
3247 let client = server
3248 .client_builder()
3249 .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3250 .build()
3251 .await;
3252
3253 server.mock_login().error500().expect(3..).mount().await;
3254
3255 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3256 }
3257
3258 #[async_test]
3259 async fn test_no_retry_http_requests() {
3260 let server = MatrixMockServer::new().await;
3261 let client = server.client_builder().build().await;
3262
3263 server.mock_devices().error500().mock_once().mount().await;
3264
3265 client.devices().await.unwrap_err();
3266 }
3267
3268 #[async_test]
3269 async fn test_set_homeserver() {
3270 let client = MockClientBuilder::new(None).build().await;
3271 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3272
3273 let homeserver = Url::parse("http://example.com/").unwrap();
3274 client.set_homeserver(homeserver.clone());
3275 assert_eq!(client.homeserver(), homeserver);
3276 }
3277
3278 #[async_test]
3279 async fn test_search_user_request() {
3280 let server = MatrixMockServer::new().await;
3281 let client = server.client_builder().build().await;
3282
3283 server.mock_user_directory().ok().mock_once().mount().await;
3284
3285 let response = client.search_users("test", 50).await.unwrap();
3286 assert_eq!(response.results.len(), 1);
3287 let result = &response.results[0];
3288 assert_eq!(result.user_id.to_string(), "@test:example.me");
3289 assert_eq!(result.display_name.clone().unwrap(), "Test");
3290 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3291 assert!(!response.limited);
3292 }
3293
3294 #[async_test]
3295 async fn test_request_unstable_features() {
3296 let server = MatrixMockServer::new().await;
3297 let client = server.client_builder().no_server_versions().build().await;
3298
3299 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3300
3301 let unstable_features = client.unstable_features().await.unwrap();
3302 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3303 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3304 }
3305
3306 #[async_test]
3307 async fn test_can_homeserver_push_encrypted_event_to_device() {
3308 let server = MatrixMockServer::new().await;
3309 let client = server.client_builder().no_server_versions().build().await;
3310
3311 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3312
3313 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3314 assert!(msc4028_enabled);
3315 }
3316
3317 #[async_test]
3318 async fn test_recently_visited_rooms() {
3319 // Tracking recently visited rooms requires authentication
3320 let client = MockClientBuilder::new(None).unlogged().build().await;
3321 assert_matches!(
3322 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3323 Err(Error::AuthenticationRequired)
3324 );
3325
3326 let client = MockClientBuilder::new(None).build().await;
3327 let account = client.account();
3328
3329 // We should start off with an empty list
3330 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3331
3332 // Tracking a valid room id should add it to the list
3333 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3334 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3335 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3336
3337 // And the existing list shouldn't be changed
3338 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3339 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3340
3341 // Tracking the same room again shouldn't change the list
3342 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3343 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3344 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3345
3346 // Tracking a second room should add it to the front of the list
3347 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3348 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3349 assert_eq!(
3350 account.get_recently_visited_rooms().await.unwrap(),
3351 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3352 );
3353
3354 // Tracking the first room yet again should move it to the front of the list
3355 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3356 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3357 assert_eq!(
3358 account.get_recently_visited_rooms().await.unwrap(),
3359 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3360 );
3361
3362 // Tracking should be capped at 20
3363 for n in 0..20 {
3364 account
3365 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3366 .await
3367 .unwrap();
3368 }
3369
3370 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3371
3372 // And the initial rooms should've been pushed out
3373 let rooms = account.get_recently_visited_rooms().await.unwrap();
3374 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3375 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3376
3377 // And the last tracked room should be the first
3378 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3379 }
3380
3381 #[async_test]
3382 async fn test_client_no_cycle_with_event_cache() {
3383 let client = MockClientBuilder::new(None).build().await;
3384
3385 // Wait for the init tasks to die.
3386 sleep(Duration::from_secs(1)).await;
3387
3388 let weak_client = WeakClient::from_client(&client);
3389 assert_eq!(weak_client.strong_count(), 1);
3390
3391 {
3392 let room_id = room_id!("!room:example.org");
3393
3394 // Have the client know the room.
3395 let response = SyncResponseBuilder::default()
3396 .add_joined_room(JoinedRoomBuilder::new(room_id))
3397 .build_sync_response();
3398 client.inner.base_client.receive_sync_response(response).await.unwrap();
3399
3400 client.event_cache().subscribe().unwrap();
3401
3402 let (_room_event_cache, _drop_handles) =
3403 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3404 }
3405
3406 drop(client);
3407
3408 // Give a bit of time for background tasks to die.
3409 sleep(Duration::from_secs(1)).await;
3410
3411 // The weak client must be the last reference to the client now.
3412 assert_eq!(weak_client.strong_count(), 0);
3413 let client = weak_client.get();
3414 assert!(
3415 client.is_none(),
3416 "too many strong references to the client: {}",
3417 Arc::strong_count(&client.unwrap().inner)
3418 );
3419 }
3420
3421 #[async_test]
3422 async fn test_server_info_caching() {
3423 let server = MatrixMockServer::new().await;
3424 let server_url = server.uri();
3425 let domain = server_url.strip_prefix("http://").unwrap();
3426 let server_name = <&ServerName>::try_from(domain).unwrap();
3427 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3428
3429 let well_known_mock = server
3430 .mock_well_known()
3431 .ok()
3432 .named("well known mock")
3433 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3434 .mount_as_scoped()
3435 .await;
3436
3437 let versions_mock = server
3438 .mock_versions()
3439 .ok_with_unstable_features()
3440 .named("first versions mock")
3441 .expect(1)
3442 .mount_as_scoped()
3443 .await;
3444
3445 let memory_store = Arc::new(MemoryStore::new());
3446 let client = Client::builder()
3447 .insecure_server_name_no_tls(server_name)
3448 .store_config(
3449 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3450 .state_store(memory_store.clone()),
3451 )
3452 .build()
3453 .await
3454 .unwrap();
3455
3456 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3457
3458 // These subsequent calls hit the in-memory cache.
3459 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3460 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3461
3462 drop(client);
3463
3464 let client = server
3465 .client_builder()
3466 .no_server_versions()
3467 .on_builder(|builder| {
3468 builder.store_config(
3469 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3470 .state_store(memory_store.clone()),
3471 )
3472 })
3473 .build()
3474 .await;
3475
3476 // These calls to the new client hit the on-disk cache.
3477 assert!(client
3478 .unstable_features()
3479 .await
3480 .unwrap()
3481 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3482 let supported = client.supported_versions().await.unwrap();
3483 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3484 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3485
3486 // Then this call hits the in-memory cache.
3487 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3488
3489 drop(versions_mock);
3490 drop(well_known_mock);
3491
3492 // Now, reset the cache, and observe the endpoints being called again once.
3493 client.reset_server_info().await.unwrap();
3494
3495 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3496
3497 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3498
3499 // Hits network again.
3500 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3501 // Hits in-memory cache again.
3502 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3503 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3504 }
3505
3506 #[async_test]
3507 async fn test_server_info_without_a_well_known() {
3508 let server = MatrixMockServer::new().await;
3509 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3510
3511 let versions_mock = server
3512 .mock_versions()
3513 .ok_with_unstable_features()
3514 .named("first versions mock")
3515 .expect(1)
3516 .mount_as_scoped()
3517 .await;
3518
3519 let memory_store = Arc::new(MemoryStore::new());
3520 let client = server
3521 .client_builder()
3522 .no_server_versions()
3523 .on_builder(|builder| {
3524 builder.store_config(
3525 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3526 .state_store(memory_store.clone()),
3527 )
3528 })
3529 .build()
3530 .await;
3531
3532 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3533
3534 // These subsequent calls hit the in-memory cache.
3535 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3536 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3537
3538 drop(client);
3539
3540 let client = server
3541 .client_builder()
3542 .no_server_versions()
3543 .on_builder(|builder| {
3544 builder.store_config(
3545 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3546 .state_store(memory_store.clone()),
3547 )
3548 })
3549 .build()
3550 .await;
3551
3552 // This call to the new client hits the on-disk cache.
3553 assert!(client
3554 .unstable_features()
3555 .await
3556 .unwrap()
3557 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3558
3559 // Then this call hits the in-memory cache.
3560 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3561
3562 drop(versions_mock);
3563
3564 // Now, reset the cache, and observe the endpoints being called again once.
3565 client.reset_server_info().await.unwrap();
3566
3567 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3568
3569 // Hits network again.
3570 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3571 // Hits in-memory cache again.
3572 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3573 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3574 }
3575
3576 #[async_test]
3577 async fn test_no_network_doesnt_cause_infinite_retries() {
3578 // We want infinite retries for transient errors.
3579 let client = MockClientBuilder::new(None)
3580 .on_builder(|builder| builder.request_config(RequestConfig::new()))
3581 .build()
3582 .await;
3583
3584 // We don't define a mock server on purpose here, so that the error is really a
3585 // network error.
3586 client.whoami().await.unwrap_err();
3587 }
3588
3589 #[async_test]
3590 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3591 let server = MatrixMockServer::new().await;
3592 let client = server.client_builder().build().await;
3593
3594 let room_id = room_id!("!room:example.org");
3595
3596 server
3597 .mock_sync()
3598 .ok_and_run(&client, |builder| {
3599 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3600 })
3601 .await;
3602
3603 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3604 assert_eq!(room.room_id(), room_id);
3605 }
3606
3607 #[async_test]
3608 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3609 let server = MatrixMockServer::new().await;
3610 let client = server.client_builder().build().await;
3611
3612 let room_id = room_id!("!room:example.org");
3613
3614 let client = Arc::new(client);
3615
3616 // Perform the /sync request with a delay so it starts after the
3617 // `await_room_remote_echo` call has happened
3618 spawn({
3619 let client = client.clone();
3620 async move {
3621 sleep(Duration::from_millis(100)).await;
3622
3623 server
3624 .mock_sync()
3625 .ok_and_run(&client, |builder| {
3626 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3627 })
3628 .await;
3629 }
3630 });
3631
3632 let room =
3633 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3634 assert_eq!(room.room_id(), room_id);
3635 }
3636
3637 #[async_test]
3638 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3639 let client = MockClientBuilder::new(None).build().await;
3640
3641 let room_id = room_id!("!room:example.org");
3642 // Room is not present so the client won't be able to find it. The call will
3643 // timeout.
3644 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3645 }
3646
3647 #[async_test]
3648 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3649 let server = MatrixMockServer::new().await;
3650 let client = server.client_builder().build().await;
3651
3652 server.mock_create_room().ok().mount().await;
3653
3654 // Create a room in the internal store
3655 let room = client
3656 .create_room(assign!(CreateRoomRequest::new(), {
3657 invite: vec![],
3658 is_direct: false,
3659 }))
3660 .await
3661 .unwrap();
3662
3663 // Room is locally present, but not synced, the call will timeout
3664 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3665 .await
3666 .unwrap_err();
3667 }
3668
3669 #[async_test]
3670 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3671 let server = MatrixMockServer::new().await;
3672 let client = server.client_builder().build().await;
3673
3674 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3675
3676 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3677 assert_matches!(ret, Ok(true));
3678 }
3679
3680 #[async_test]
3681 async fn test_is_room_alias_available_if_alias_is_resolved() {
3682 let server = MatrixMockServer::new().await;
3683 let client = server.client_builder().build().await;
3684
3685 server
3686 .mock_room_directory_resolve_alias()
3687 .ok("!some_room_id:matrix.org", Vec::new())
3688 .expect(1)
3689 .mount()
3690 .await;
3691
3692 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3693 assert_matches!(ret, Ok(false));
3694 }
3695
3696 #[async_test]
3697 async fn test_is_room_alias_available_if_error_found() {
3698 let server = MatrixMockServer::new().await;
3699 let client = server.client_builder().build().await;
3700
3701 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3702
3703 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3704 assert_matches!(ret, Err(_));
3705 }
3706
3707 #[async_test]
3708 async fn test_create_room_alias() {
3709 let server = MatrixMockServer::new().await;
3710 let client = server.client_builder().build().await;
3711
3712 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3713
3714 let ret = client
3715 .create_room_alias(
3716 room_alias_id!("#some_alias:matrix.org"),
3717 room_id!("!some_room:matrix.org"),
3718 )
3719 .await;
3720 assert_matches!(ret, Ok(()));
3721 }
3722
3723 #[async_test]
3724 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3725 let server = MatrixMockServer::new().await;
3726 let client = server.client_builder().build().await;
3727
3728 let room_id = room_id!("!a-room:matrix.org");
3729
3730 // Make sure the summary endpoint is called once
3731 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3732
3733 // We create a locally cached invited room
3734 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3735
3736 // And we get a preview, the server endpoint was reached
3737 let preview = client
3738 .get_room_preview(room_id.into(), Vec::new())
3739 .await
3740 .expect("Room preview should be retrieved");
3741
3742 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3743 }
3744
3745 #[async_test]
3746 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3747 let server = MatrixMockServer::new().await;
3748 let client = server.client_builder().build().await;
3749
3750 let room_id = room_id!("!a-room:matrix.org");
3751
3752 // Make sure the summary endpoint is called once
3753 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3754
3755 // We create a locally cached left room
3756 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3757
3758 // And we get a preview, the server endpoint was reached
3759 let preview = client
3760 .get_room_preview(room_id.into(), Vec::new())
3761 .await
3762 .expect("Room preview should be retrieved");
3763
3764 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3765 }
3766
3767 #[async_test]
3768 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3769 let server = MatrixMockServer::new().await;
3770 let client = server.client_builder().build().await;
3771
3772 let room_id = room_id!("!a-room:matrix.org");
3773
3774 // Make sure the summary endpoint is called once
3775 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3776
3777 // We create a locally cached knocked room
3778 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3779
3780 // And we get a preview, the server endpoint was reached
3781 let preview = client
3782 .get_room_preview(room_id.into(), Vec::new())
3783 .await
3784 .expect("Room preview should be retrieved");
3785
3786 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3787 }
3788
3789 #[async_test]
3790 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3791 let server = MatrixMockServer::new().await;
3792 let client = server.client_builder().build().await;
3793
3794 let room_id = room_id!("!a-room:matrix.org");
3795
3796 // Make sure the summary endpoint is not called
3797 server.mock_room_summary().ok(room_id).never().mount().await;
3798
3799 // We create a locally cached joined room
3800 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3801
3802 // And we get a preview, no server endpoint was reached
3803 let preview = client
3804 .get_room_preview(room_id.into(), Vec::new())
3805 .await
3806 .expect("Room preview should be retrieved");
3807
3808 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3809 }
3810
3811 #[async_test]
3812 async fn test_media_preview_config() {
3813 let server = MatrixMockServer::new().await;
3814 let client = server.client_builder().build().await;
3815
3816 server
3817 .mock_sync()
3818 .ok_and_run(&client, |builder| {
3819 builder.add_custom_global_account_data(json!({
3820 "content": {
3821 "media_previews": "private",
3822 "invite_avatars": "off"
3823 },
3824 "type": "m.media_preview_config"
3825 }));
3826 })
3827 .await;
3828
3829 let (initial_value, stream) =
3830 client.account().observe_media_preview_config().await.unwrap();
3831
3832 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3833 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3834 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3835 pin_mut!(stream);
3836 assert_pending!(stream);
3837
3838 server
3839 .mock_sync()
3840 .ok_and_run(&client, |builder| {
3841 builder.add_custom_global_account_data(json!({
3842 "content": {
3843 "media_previews": "off",
3844 "invite_avatars": "on"
3845 },
3846 "type": "m.media_preview_config"
3847 }));
3848 })
3849 .await;
3850
3851 assert_next_matches!(
3852 stream,
3853 MediaPreviewConfigEventContent {
3854 media_previews: Some(MediaPreviews::Off),
3855 invite_avatars: Some(InviteAvatars::On),
3856 ..
3857 }
3858 );
3859 assert_pending!(stream);
3860 }
3861
3862 #[async_test]
3863 async fn test_unstable_media_preview_config() {
3864 let server = MatrixMockServer::new().await;
3865 let client = server.client_builder().build().await;
3866
3867 server
3868 .mock_sync()
3869 .ok_and_run(&client, |builder| {
3870 builder.add_custom_global_account_data(json!({
3871 "content": {
3872 "media_previews": "private",
3873 "invite_avatars": "off"
3874 },
3875 "type": "io.element.msc4278.media_preview_config"
3876 }));
3877 })
3878 .await;
3879
3880 let (initial_value, stream) =
3881 client.account().observe_media_preview_config().await.unwrap();
3882
3883 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3884 assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3885 assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3886 pin_mut!(stream);
3887 assert_pending!(stream);
3888
3889 server
3890 .mock_sync()
3891 .ok_and_run(&client, |builder| {
3892 builder.add_custom_global_account_data(json!({
3893 "content": {
3894 "media_previews": "off",
3895 "invite_avatars": "on"
3896 },
3897 "type": "io.element.msc4278.media_preview_config"
3898 }));
3899 })
3900 .await;
3901
3902 assert_next_matches!(
3903 stream,
3904 MediaPreviewConfigEventContent {
3905 media_previews: Some(MediaPreviews::Off),
3906 invite_avatars: Some(InviteAvatars::On),
3907 ..
3908 }
3909 );
3910 assert_pending!(stream);
3911 }
3912
3913 #[async_test]
3914 async fn test_media_preview_config_not_found() {
3915 let server = MatrixMockServer::new().await;
3916 let client = server.client_builder().build().await;
3917
3918 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3919
3920 assert!(initial_value.is_none());
3921 }
3922
3923 #[async_test]
3924 async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3925 // The default Matrix version we use is 1.11 or higher, so authenticated media
3926 // is supported.
3927 let server = MatrixMockServer::new().await;
3928 let client = server.client_builder().build().await;
3929
3930 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3931
3932 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3933 client.load_or_fetch_max_upload_size().await.unwrap();
3934
3935 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3936 }
3937
3938 #[async_test]
3939 async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3940 // The server must advertise support for the stable feature for authenticated
3941 // media support, so we mock the `GET /versions` response.
3942 let server = MatrixMockServer::new().await;
3943 let client = server.client_builder().no_server_versions().build().await;
3944
3945 server
3946 .mock_versions()
3947 .ok_custom(
3948 &["v1.7", "v1.8", "v1.9", "v1.10"],
3949 &[("org.matrix.msc3916.stable", true)].into(),
3950 )
3951 .named("versions")
3952 .expect(1)
3953 .mount()
3954 .await;
3955
3956 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3957
3958 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3959 client.load_or_fetch_max_upload_size().await.unwrap();
3960
3961 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3962 }
3963
3964 #[async_test]
3965 async fn test_load_or_fetch_max_upload_size_no_auth() {
3966 // The server must not support Matrix 1.11 or higher for unauthenticated
3967 // media requests, so we mock the `GET /versions` response.
3968 let server = MatrixMockServer::new().await;
3969 let client = server.client_builder().no_server_versions().build().await;
3970
3971 server
3972 .mock_versions()
3973 .ok_custom(&["v1.1"], &Default::default())
3974 .named("versions")
3975 .expect(1)
3976 .mount()
3977 .await;
3978
3979 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3980
3981 server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3982 client.load_or_fetch_max_upload_size().await.unwrap();
3983
3984 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3985 }
3986
3987 #[async_test]
3988 async fn test_uploading_a_too_large_media_file() {
3989 let server = MatrixMockServer::new().await;
3990 let client = server.client_builder().build().await;
3991
3992 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3993 client.load_or_fetch_max_upload_size().await.unwrap();
3994 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3995
3996 let data = vec![1, 2];
3997 let upload_request =
3998 ruma::api::client::media::create_content::v3::Request::new(data.clone());
3999 let request = SendRequest {
4000 client: client.clone(),
4001 request: upload_request,
4002 config: None,
4003 send_progress: SharedObservable::new(TransmissionProgress::default()),
4004 };
4005 let media_request = SendMediaUploadRequest::new(request);
4006
4007 let error = media_request.await.err();
4008 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4009 assert_eq!(max, uint!(1));
4010 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4011 }
4012
4013 #[async_test]
4014 async fn test_dont_ignore_timeout_on_first_sync() {
4015 let server = MatrixMockServer::new().await;
4016 let client = server.client_builder().build().await;
4017
4018 server
4019 .mock_sync()
4020 .timeout(Some(Duration::from_secs(30)))
4021 .ok(|_| {})
4022 .mock_once()
4023 .named("sync_with_timeout")
4024 .mount()
4025 .await;
4026
4027 // Call the endpoint once to check the timeout.
4028 let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4029
4030 timeout(Duration::from_secs(1), async {
4031 stream.next().await.unwrap().unwrap();
4032 })
4033 .await
4034 .unwrap();
4035 }
4036
4037 #[async_test]
4038 async fn test_ignore_timeout_on_first_sync() {
4039 let server = MatrixMockServer::new().await;
4040 let client = server.client_builder().build().await;
4041
4042 server
4043 .mock_sync()
4044 .timeout(None)
4045 .ok(|_| {})
4046 .mock_once()
4047 .named("sync_no_timeout")
4048 .mount()
4049 .await;
4050 server
4051 .mock_sync()
4052 .timeout(Some(Duration::from_secs(30)))
4053 .ok(|_| {})
4054 .mock_once()
4055 .named("sync_with_timeout")
4056 .mount()
4057 .await;
4058
4059 // Call each version of the endpoint once to check the timeouts.
4060 let mut stream = Box::pin(
4061 client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4062 );
4063
4064 timeout(Duration::from_secs(1), async {
4065 stream.next().await.unwrap().unwrap();
4066 stream.next().await.unwrap().unwrap();
4067 })
4068 .await
4069 .unwrap();
4070 }
4071}