matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap, BTreeSet},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
32use matrix_sdk_base::{
33 event_cache::store::EventCacheStoreLock,
34 store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
35 sync::{Notification, RoomUpdates},
36 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43 api::{
44 client::{
45 account::whoami,
46 alias::{create_alias, delete_alias, get_alias},
47 device::{delete_devices, get_devices, update_device},
48 directory::{get_public_rooms, get_public_rooms_filtered},
49 discovery::{
50 discover_homeserver::{self, RtcFocusInfo},
51 get_capabilities::{self, Capabilities},
52 get_supported_versions,
53 },
54 error::ErrorKind,
55 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
56 knock::knock_room,
57 membership::{join_room_by_id, join_room_by_id_or_alias},
58 room::create_room,
59 session::login::v3::DiscoveryInfo,
60 sync::sync_events,
61 uiaa,
62 user_directory::search_users,
63 },
64 error::FromHttpResponseError,
65 FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
66 },
67 assign,
68 push::Ruleset,
69 time::Instant,
70 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
71 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
72};
73use serde::de::DeserializeOwned;
74use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
75use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
76use url::Url;
77
78use self::futures::SendRequest;
79use crate::{
80 authentication::{
81 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
82 SaveSessionCallback,
83 },
84 config::RequestConfig,
85 deduplicating_handler::DeduplicatingHandler,
86 error::HttpResult,
87 event_cache::EventCache,
88 event_handler::{
89 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
90 EventHandlerStore, ObservableEventHandler, SyncEvent,
91 },
92 http_client::HttpClient,
93 latest_events::LatestEvents,
94 media::MediaError,
95 notification_settings::NotificationSettings,
96 room::RoomMember,
97 room_preview::RoomPreview,
98 send_queue::{SendQueue, SendQueueData},
99 sliding_sync::Version as SlidingSyncVersion,
100 sync::{RoomUpdate, SyncResponse},
101 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
102 Room, SessionTokens, TransmissionProgress,
103};
104#[cfg(feature = "e2e-encryption")]
105use crate::{
106 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
107 store_locks::CrossProcessStoreLock,
108};
109
110mod builder;
111pub(crate) mod caches;
112pub(crate) mod futures;
113
114pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
115
116#[cfg(not(target_family = "wasm"))]
117type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
118#[cfg(target_family = "wasm")]
119type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
120
121#[cfg(not(target_family = "wasm"))]
122type NotificationHandlerFn =
123 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
124#[cfg(target_family = "wasm")]
125type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
126
127/// Enum controlling if a loop running callbacks should continue or abort.
128///
129/// This is mainly used in the [`sync_with_callback`] method, the return value
130/// of the provided callback controls if the sync loop should be exited.
131///
132/// [`sync_with_callback`]: #method.sync_with_callback
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum LoopCtrl {
135 /// Continue running the loop.
136 Continue,
137 /// Break out of the loop.
138 Break,
139}
140
141/// Represents changes that can occur to a `Client`s `Session`.
142#[derive(Debug, Clone, PartialEq)]
143pub enum SessionChange {
144 /// The session's token is no longer valid.
145 UnknownToken {
146 /// Whether or not the session was soft logged out
147 soft_logout: bool,
148 },
149 /// The session's tokens have been refreshed.
150 TokensRefreshed,
151}
152
153/// An async/await enabled Matrix client.
154///
155/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
156#[derive(Clone)]
157pub struct Client {
158 pub(crate) inner: Arc<ClientInner>,
159}
160
161#[derive(Default)]
162pub(crate) struct ClientLocks {
163 /// Lock ensuring that only a single room may be marked as a DM at once.
164 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
165 /// explanation.
166 pub(crate) mark_as_dm_lock: Mutex<()>,
167
168 /// Lock ensuring that only a single secret store is getting opened at the
169 /// same time.
170 ///
171 /// This is important so we don't accidentally create multiple different new
172 /// default secret storage keys.
173 #[cfg(feature = "e2e-encryption")]
174 pub(crate) open_secret_store_lock: Mutex<()>,
175
176 /// Lock ensuring that we're only storing a single secret at a time.
177 ///
178 /// Take a look at the [`SecretStore::put_secret`] method for a more
179 /// detailed explanation.
180 ///
181 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
182 #[cfg(feature = "e2e-encryption")]
183 pub(crate) store_secret_lock: Mutex<()>,
184
185 /// Lock ensuring that only one method at a time might modify our backup.
186 #[cfg(feature = "e2e-encryption")]
187 pub(crate) backup_modify_lock: Mutex<()>,
188
189 /// Lock ensuring that we're going to attempt to upload backups for a single
190 /// requester.
191 #[cfg(feature = "e2e-encryption")]
192 pub(crate) backup_upload_lock: Mutex<()>,
193
194 /// Handler making sure we only have one group session sharing request in
195 /// flight per room.
196 #[cfg(feature = "e2e-encryption")]
197 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
198
199 /// Lock making sure we're only doing one key claim request at a time.
200 #[cfg(feature = "e2e-encryption")]
201 pub(crate) key_claim_lock: Mutex<()>,
202
203 /// Handler to ensure that only one members request is running at a time,
204 /// given a room.
205 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
206
207 /// Handler to ensure that only one encryption state request is running at a
208 /// time, given a room.
209 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
210
211 /// Deduplicating handler for sending read receipts. The string is an
212 /// internal implementation detail, see [`Self::send_single_receipt`].
213 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
214
215 #[cfg(feature = "e2e-encryption")]
216 pub(crate) cross_process_crypto_store_lock:
217 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
218
219 /// Latest "generation" of data known by the crypto store.
220 ///
221 /// This is a counter that only increments, set in the database (and can
222 /// wrap). It's incremented whenever some process acquires a lock for the
223 /// first time. *This assumes the crypto store lock is being held, to
224 /// avoid data races on writing to this value in the store*.
225 ///
226 /// The current process will maintain this value in local memory and in the
227 /// DB over time. Observing a different value than the one read in
228 /// memory, when reading from the store indicates that somebody else has
229 /// written into the database under our feet.
230 ///
231 /// TODO: this should live in the `OlmMachine`, since it's information
232 /// related to the lock. As of today (2023-07-28), we blow up the entire
233 /// olm machine when there's a generation mismatch. So storing the
234 /// generation in the olm machine would make the client think there's
235 /// *always* a mismatch, and that's why we need to store the generation
236 /// outside the `OlmMachine`.
237 #[cfg(feature = "e2e-encryption")]
238 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
239}
240
241pub(crate) struct ClientInner {
242 /// All the data related to authentication and authorization.
243 pub(crate) auth_ctx: Arc<AuthCtx>,
244
245 /// The URL of the server.
246 ///
247 /// Not to be confused with the `Self::homeserver`. `server` is usually
248 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
249 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
250 /// homeserver (at the time of writing — 2024-08-28).
251 ///
252 /// This value is optional depending on how the `Client` has been built.
253 /// If it's been built from a homeserver URL directly, we don't know the
254 /// server. However, if the `Client` has been built from a server URL or
255 /// name, then the homeserver has been discovered, and we know both.
256 server: Option<Url>,
257
258 /// The URL of the homeserver to connect to.
259 ///
260 /// This is the URL for the client-server Matrix API.
261 homeserver: StdRwLock<Url>,
262
263 /// The sliding sync version.
264 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
265
266 /// The underlying HTTP client.
267 pub(crate) http_client: HttpClient,
268
269 /// User session data.
270 pub(super) base_client: BaseClient,
271
272 /// Collection of in-memory caches for the [`Client`].
273 pub(crate) caches: ClientCaches,
274
275 /// Collection of locks individual client methods might want to use, either
276 /// to ensure that only a single call to a method happens at once or to
277 /// deduplicate multiple calls to a method.
278 pub(crate) locks: ClientLocks,
279
280 /// The cross-process store locks holder name.
281 ///
282 /// The SDK provides cross-process store locks (see
283 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
284 /// `holder_name` is the value used for all cross-process store locks
285 /// used by this `Client`.
286 ///
287 /// If multiple `Client`s are running in different processes, this
288 /// value MUST be different for each `Client`.
289 cross_process_store_locks_holder_name: String,
290
291 /// A mapping of the times at which the current user sent typing notices,
292 /// keyed by room.
293 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
294
295 /// Event handlers. See `add_event_handler`.
296 pub(crate) event_handlers: EventHandlerStore,
297
298 /// Notification handlers. See `register_notification_handler`.
299 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
300
301 /// The sender-side of channels used to receive room updates.
302 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
303
304 /// The sender-side of a channel used to observe all the room updates of a
305 /// sync response.
306 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
307
308 /// Whether the client should update its homeserver URL with the discovery
309 /// information present in the login response.
310 respect_login_well_known: bool,
311
312 /// An event that can be listened on to wait for a successful sync. The
313 /// event will only be fired if a sync loop is running. Can be used for
314 /// synchronization, e.g. if we send out a request to create a room, we can
315 /// wait for the sync to get the data to fetch a room object from the state
316 /// store.
317 pub(crate) sync_beat: event_listener::Event,
318
319 /// A central cache for events, inactive first.
320 ///
321 /// It becomes active when [`EventCache::subscribe`] is called.
322 pub(crate) event_cache: OnceCell<EventCache>,
323
324 /// End-to-end encryption related state.
325 #[cfg(feature = "e2e-encryption")]
326 pub(crate) e2ee: EncryptionData,
327
328 /// The verification state of our own device.
329 #[cfg(feature = "e2e-encryption")]
330 pub(crate) verification_state: SharedObservable<VerificationState>,
331
332 /// Whether to enable the experimental support for sending and receiving
333 /// encrypted room history on invite, per [MSC4268].
334 ///
335 /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
336 #[cfg(feature = "e2e-encryption")]
337 pub(crate) enable_share_history_on_invite: bool,
338
339 /// Data related to the [`SendQueue`].
340 ///
341 /// [`SendQueue`]: crate::send_queue::SendQueue
342 pub(crate) send_queue_data: Arc<SendQueueData>,
343
344 /// The `max_upload_size` value of the homeserver, it contains the max
345 /// request size you can send.
346 pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
347
348 /// The entry point to get the [`LatestEvent`] of rooms and threads.
349 ///
350 /// [`LatestEvent`]: crate::latest_event::LatestEvent
351 latest_events: OnceCell<LatestEvents>,
352}
353
354impl ClientInner {
355 /// Create a new `ClientInner`.
356 ///
357 /// All the fields passed as parameters here are those that must be cloned
358 /// upon instantiation of a sub-client, e.g. a client specialized for
359 /// notifications.
360 #[allow(clippy::too_many_arguments)]
361 async fn new(
362 auth_ctx: Arc<AuthCtx>,
363 server: Option<Url>,
364 homeserver: Url,
365 sliding_sync_version: SlidingSyncVersion,
366 http_client: HttpClient,
367 base_client: BaseClient,
368 server_info: ClientServerInfo,
369 respect_login_well_known: bool,
370 event_cache: OnceCell<EventCache>,
371 send_queue: Arc<SendQueueData>,
372 latest_events: OnceCell<LatestEvents>,
373 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
374 #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
375 cross_process_store_locks_holder_name: String,
376 ) -> Arc<Self> {
377 let caches = ClientCaches {
378 server_info: server_info.into(),
379 server_metadata: Mutex::new(TtlCache::new()),
380 };
381
382 let client = Self {
383 server,
384 homeserver: StdRwLock::new(homeserver),
385 auth_ctx,
386 sliding_sync_version: StdRwLock::new(sliding_sync_version),
387 http_client,
388 base_client,
389 caches,
390 locks: Default::default(),
391 cross_process_store_locks_holder_name,
392 typing_notice_times: Default::default(),
393 event_handlers: Default::default(),
394 notification_handlers: Default::default(),
395 room_update_channels: Default::default(),
396 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
397 // ballast for all observers to catch up.
398 room_updates_sender: broadcast::Sender::new(32),
399 respect_login_well_known,
400 sync_beat: event_listener::Event::new(),
401 event_cache,
402 send_queue_data: send_queue,
403 latest_events,
404 #[cfg(feature = "e2e-encryption")]
405 e2ee: EncryptionData::new(encryption_settings),
406 #[cfg(feature = "e2e-encryption")]
407 verification_state: SharedObservable::new(VerificationState::Unknown),
408 #[cfg(feature = "e2e-encryption")]
409 enable_share_history_on_invite,
410 server_max_upload_size: Mutex::new(OnceCell::new()),
411 };
412
413 #[allow(clippy::let_and_return)]
414 let client = Arc::new(client);
415
416 #[cfg(feature = "e2e-encryption")]
417 client.e2ee.initialize_room_key_tasks(&client);
418
419 let _ = client
420 .event_cache
421 .get_or_init(|| async {
422 EventCache::new(
423 WeakClient::from_inner(&client),
424 client.base_client.event_cache_store().clone(),
425 )
426 })
427 .await;
428
429 client
430 }
431}
432
433#[cfg(not(tarpaulin_include))]
434impl Debug for Client {
435 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
436 write!(fmt, "Client")
437 }
438}
439
440impl Client {
441 /// Create a new [`Client`] that will use the given homeserver.
442 ///
443 /// # Arguments
444 ///
445 /// * `homeserver_url` - The homeserver that the client should connect to.
446 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
447 Self::builder().homeserver_url(homeserver_url).build().await
448 }
449
450 /// Returns a subscriber that publishes an event every time the ignore user
451 /// list changes.
452 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
453 self.inner.base_client.subscribe_to_ignore_user_list_changes()
454 }
455
456 /// Create a new [`ClientBuilder`].
457 pub fn builder() -> ClientBuilder {
458 ClientBuilder::new()
459 }
460
461 pub(crate) fn base_client(&self) -> &BaseClient {
462 &self.inner.base_client
463 }
464
465 /// The underlying HTTP client.
466 pub fn http_client(&self) -> &reqwest::Client {
467 &self.inner.http_client.inner
468 }
469
470 pub(crate) fn locks(&self) -> &ClientLocks {
471 &self.inner.locks
472 }
473
474 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
475 &self.inner.auth_ctx
476 }
477
478 /// The cross-process store locks holder name.
479 ///
480 /// The SDK provides cross-process store locks (see
481 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
482 /// `holder_name` is the value used for all cross-process store locks
483 /// used by this `Client`.
484 pub fn cross_process_store_locks_holder_name(&self) -> &str {
485 &self.inner.cross_process_store_locks_holder_name
486 }
487
488 /// Change the homeserver URL used by this client.
489 ///
490 /// # Arguments
491 ///
492 /// * `homeserver_url` - The new URL to use.
493 fn set_homeserver(&self, homeserver_url: Url) {
494 *self.inner.homeserver.write().unwrap() = homeserver_url;
495 }
496
497 /// Get the capabilities of the homeserver.
498 ///
499 /// This method should be used to check what features are supported by the
500 /// homeserver.
501 ///
502 /// # Examples
503 ///
504 /// ```no_run
505 /// # use matrix_sdk::Client;
506 /// # use url::Url;
507 /// # async {
508 /// # let homeserver = Url::parse("http://example.com")?;
509 /// let client = Client::new(homeserver).await?;
510 ///
511 /// let capabilities = client.get_capabilities().await?;
512 ///
513 /// if capabilities.change_password.enabled {
514 /// // Change password
515 /// }
516 /// # anyhow::Ok(()) };
517 /// ```
518 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
519 let res = self.send(get_capabilities::v3::Request::new()).await?;
520 Ok(res.capabilities)
521 }
522
523 /// Get a copy of the default request config.
524 ///
525 /// The default request config is what's used when sending requests if no
526 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
527 /// function with such a parameter.
528 ///
529 /// If the default request config was not customized through
530 /// [`ClientBuilder`] when creating this `Client`, the returned value will
531 /// be equivalent to [`RequestConfig::default()`].
532 pub fn request_config(&self) -> RequestConfig {
533 self.inner.http_client.request_config
534 }
535
536 /// Check whether the client has been activated.
537 ///
538 /// A client is considered active when:
539 ///
540 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
541 /// is logged in,
542 /// 2. Has loaded cached data from storage,
543 /// 3. If encryption is enabled, it also initialized or restored its
544 /// `OlmMachine`.
545 pub fn is_active(&self) -> bool {
546 self.inner.base_client.is_active()
547 }
548
549 /// The server used by the client.
550 ///
551 /// See `Self::server` to learn more.
552 pub fn server(&self) -> Option<&Url> {
553 self.inner.server.as_ref()
554 }
555
556 /// The homeserver of the client.
557 pub fn homeserver(&self) -> Url {
558 self.inner.homeserver.read().unwrap().clone()
559 }
560
561 /// Get the sliding sync version.
562 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
563 self.inner.sliding_sync_version.read().unwrap().clone()
564 }
565
566 /// Override the sliding sync version.
567 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
568 let mut lock = self.inner.sliding_sync_version.write().unwrap();
569 *lock = version;
570 }
571
572 /// Get the Matrix user session meta information.
573 ///
574 /// If the client is currently logged in, this will return a
575 /// [`SessionMeta`] object which contains the user ID and device ID.
576 /// Otherwise it returns `None`.
577 pub fn session_meta(&self) -> Option<&SessionMeta> {
578 self.base_client().session_meta()
579 }
580
581 /// Returns a receiver that gets events for each room info update. To watch
582 /// for new events, use `receiver.resubscribe()`. Each event contains the
583 /// room and a boolean whether this event should trigger a room list update.
584 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
585 self.base_client().room_info_notable_update_receiver()
586 }
587
588 /// Performs a search for users.
589 /// The search is performed case-insensitively on user IDs and display names
590 ///
591 /// # Arguments
592 ///
593 /// * `search_term` - The search term for the search
594 /// * `limit` - The maximum number of results to return. Defaults to 10.
595 ///
596 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
597 pub async fn search_users(
598 &self,
599 search_term: &str,
600 limit: u64,
601 ) -> HttpResult<search_users::v3::Response> {
602 let mut request = search_users::v3::Request::new(search_term.to_owned());
603
604 if let Some(limit) = UInt::new(limit) {
605 request.limit = limit;
606 }
607
608 self.send(request).await
609 }
610
611 /// Get the user id of the current owner of the client.
612 pub fn user_id(&self) -> Option<&UserId> {
613 self.session_meta().map(|s| s.user_id.as_ref())
614 }
615
616 /// Get the device ID that identifies the current session.
617 pub fn device_id(&self) -> Option<&DeviceId> {
618 self.session_meta().map(|s| s.device_id.as_ref())
619 }
620
621 /// Get the current access token for this session.
622 ///
623 /// Will be `None` if the client has not been logged in.
624 pub fn access_token(&self) -> Option<String> {
625 self.auth_ctx().access_token()
626 }
627
628 /// Get the current tokens for this session.
629 ///
630 /// To be notified of changes in the session tokens, use
631 /// [`Client::subscribe_to_session_changes()`] or
632 /// [`Client::set_session_callbacks()`].
633 ///
634 /// Returns `None` if the client has not been logged in.
635 pub fn session_tokens(&self) -> Option<SessionTokens> {
636 self.auth_ctx().session_tokens()
637 }
638
639 /// Access the authentication API used to log in this client.
640 ///
641 /// Will be `None` if the client has not been logged in.
642 pub fn auth_api(&self) -> Option<AuthApi> {
643 match self.auth_ctx().auth_data.get()? {
644 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
645 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
646 }
647 }
648
649 /// Get the whole session info of this client.
650 ///
651 /// Will be `None` if the client has not been logged in.
652 ///
653 /// Can be used with [`Client::restore_session`] to restore a previously
654 /// logged-in session.
655 pub fn session(&self) -> Option<AuthSession> {
656 match self.auth_api()? {
657 AuthApi::Matrix(api) => api.session().map(Into::into),
658 AuthApi::OAuth(api) => api.full_session().map(Into::into),
659 }
660 }
661
662 /// Get a reference to the state store.
663 pub fn state_store(&self) -> &DynStateStore {
664 self.base_client().state_store()
665 }
666
667 /// Get a reference to the event cache store.
668 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
669 self.base_client().event_cache_store()
670 }
671
672 /// Access the native Matrix authentication API with this client.
673 pub fn matrix_auth(&self) -> MatrixAuth {
674 MatrixAuth::new(self.clone())
675 }
676
677 /// Get the account of the current owner of the client.
678 pub fn account(&self) -> Account {
679 Account::new(self.clone())
680 }
681
682 /// Get the encryption manager of the client.
683 #[cfg(feature = "e2e-encryption")]
684 pub fn encryption(&self) -> Encryption {
685 Encryption::new(self.clone())
686 }
687
688 /// Get the media manager of the client.
689 pub fn media(&self) -> Media {
690 Media::new(self.clone())
691 }
692
693 /// Get the pusher manager of the client.
694 pub fn pusher(&self) -> Pusher {
695 Pusher::new(self.clone())
696 }
697
698 /// Access the OAuth 2.0 API of the client.
699 pub fn oauth(&self) -> OAuth {
700 OAuth::new(self.clone())
701 }
702
703 /// Register a handler for a specific event type.
704 ///
705 /// The handler is a function or closure with one or more arguments. The
706 /// first argument is the event itself. All additional arguments are
707 /// "context" arguments: They have to implement [`EventHandlerContext`].
708 /// This trait is named that way because most of the types implementing it
709 /// give additional context about an event: The room it was in, its raw form
710 /// and other similar things. As two exceptions to this,
711 /// [`Client`] and [`EventHandlerHandle`] also implement the
712 /// `EventHandlerContext` trait so you don't have to clone your client
713 /// into the event handler manually and a handler can decide to remove
714 /// itself.
715 ///
716 /// Some context arguments are not universally applicable. A context
717 /// argument that isn't available for the given event type will result in
718 /// the event handler being skipped and an error being logged. The following
719 /// context argument types are only available for a subset of event types:
720 ///
721 /// * [`Room`] is only available for room-specific events, i.e. not for
722 /// events like global account data events or presence events.
723 ///
724 /// You can provide custom context via
725 /// [`add_event_handler_context`](Client::add_event_handler_context) and
726 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
727 /// into the event handler.
728 ///
729 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
730 ///
731 /// # Examples
732 ///
733 /// ```no_run
734 /// use matrix_sdk::{
735 /// deserialized_responses::EncryptionInfo,
736 /// event_handler::Ctx,
737 /// ruma::{
738 /// events::{
739 /// macros::EventContent,
740 /// push_rules::PushRulesEvent,
741 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
742 /// },
743 /// push::Action,
744 /// Int, MilliSecondsSinceUnixEpoch,
745 /// },
746 /// Client, Room,
747 /// };
748 /// use serde::{Deserialize, Serialize};
749 ///
750 /// # async fn example(client: Client) {
751 /// client.add_event_handler(
752 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
753 /// // Common usage: Room event plus room and client.
754 /// },
755 /// );
756 /// client.add_event_handler(
757 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
758 /// async move {
759 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
760 /// // unencrypted events and events that were decrypted by the SDK.
761 /// }
762 /// },
763 /// );
764 /// client.add_event_handler(
765 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
766 /// async move {
767 /// // A `Vec<Action>` parameter allows you to know which push actions
768 /// // are applicable for an event. For example, an event with
769 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
770 /// // in the timeline.
771 /// }
772 /// },
773 /// );
774 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
775 /// // You can omit any or all arguments after the first.
776 /// });
777 ///
778 /// // Registering a temporary event handler:
779 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
780 /// /* Event handler */
781 /// });
782 /// client.remove_event_handler(handle);
783 ///
784 /// // Registering custom event handler context:
785 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
786 /// struct MyContext {
787 /// number: usize,
788 /// }
789 /// client.add_event_handler_context(MyContext { number: 5 });
790 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
791 /// // Use the context
792 /// });
793 ///
794 /// // Custom events work exactly the same way, you just need to declare
795 /// // the content struct and use the EventContent derive macro on it.
796 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
797 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
798 /// struct TokenEventContent {
799 /// token: String,
800 /// #[serde(rename = "exp")]
801 /// expires_at: MilliSecondsSinceUnixEpoch,
802 /// }
803 ///
804 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
805 /// todo!("Display the token");
806 /// });
807 ///
808 /// // Event handler closures can also capture local variables.
809 /// // Make sure they are cheap to clone though, because they will be cloned
810 /// // every time the closure is called.
811 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
812 ///
813 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
814 /// println!("Calling the handler with identifier {data}");
815 /// });
816 /// # }
817 /// ```
818 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
819 where
820 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
821 H: EventHandler<Ev, Ctx>,
822 {
823 self.add_event_handler_impl(handler, None)
824 }
825
826 /// Register a handler for a specific room, and event type.
827 ///
828 /// This method works the same way as
829 /// [`add_event_handler`][Self::add_event_handler], except that the handler
830 /// will only be called for events in the room with the specified ID. See
831 /// that method for more details on event handler functions.
832 ///
833 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
834 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
835 /// your use case.
836 pub fn add_room_event_handler<Ev, Ctx, H>(
837 &self,
838 room_id: &RoomId,
839 handler: H,
840 ) -> EventHandlerHandle
841 where
842 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
843 H: EventHandler<Ev, Ctx>,
844 {
845 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
846 }
847
848 /// Observe a specific event type.
849 ///
850 /// `Ev` represents the kind of event that will be observed. `Ctx`
851 /// represents the context that will come with the event. It relies on the
852 /// same mechanism as [`Client::add_event_handler`]. The main difference is
853 /// that it returns an [`ObservableEventHandler`] and doesn't require a
854 /// user-defined closure. It is possible to subscribe to the
855 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
856 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
857 /// Ctx)`.
858 ///
859 /// Be careful that only the most recent value can be observed. Subscribers
860 /// are notified when a new value is sent, but there is no guarantee
861 /// that they will see all values.
862 ///
863 /// # Example
864 ///
865 /// Let's see a classical usage:
866 ///
867 /// ```
868 /// use futures_util::StreamExt as _;
869 /// use matrix_sdk::{
870 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
871 /// Client, Room,
872 /// };
873 ///
874 /// # async fn example(client: Client) -> Option<()> {
875 /// let observer =
876 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
877 ///
878 /// let mut subscriber = observer.subscribe();
879 ///
880 /// let (event, (room, push_actions)) = subscriber.next().await?;
881 /// # Some(())
882 /// # }
883 /// ```
884 ///
885 /// Now let's see how to get several contexts that can be useful for you:
886 ///
887 /// ```
888 /// use matrix_sdk::{
889 /// deserialized_responses::EncryptionInfo,
890 /// ruma::{
891 /// events::room::{
892 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
893 /// },
894 /// push::Action,
895 /// },
896 /// Client, Room,
897 /// };
898 ///
899 /// # async fn example(client: Client) {
900 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
901 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
902 ///
903 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
904 /// // to distinguish between unencrypted events and events that were decrypted
905 /// // by the SDK.
906 /// let _ = client
907 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
908 /// );
909 ///
910 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
911 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
912 /// // should be highlighted in the timeline.
913 /// let _ =
914 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
915 ///
916 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
917 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
918 /// # }
919 /// ```
920 ///
921 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
922 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
923 where
924 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
925 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
926 {
927 self.observe_room_events_impl(None)
928 }
929
930 /// Observe a specific room, and event type.
931 ///
932 /// This method works the same way as [`Client::observe_events`], except
933 /// that the observability will only be applied for events in the room with
934 /// the specified ID. See that method for more details.
935 ///
936 /// Be careful that only the most recent value can be observed. Subscribers
937 /// are notified when a new value is sent, but there is no guarantee
938 /// that they will see all values.
939 pub fn observe_room_events<Ev, Ctx>(
940 &self,
941 room_id: &RoomId,
942 ) -> ObservableEventHandler<(Ev, Ctx)>
943 where
944 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
945 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
946 {
947 self.observe_room_events_impl(Some(room_id.to_owned()))
948 }
949
950 /// Shared implementation for `Client::observe_events` and
951 /// `Client::observe_room_events`.
952 fn observe_room_events_impl<Ev, Ctx>(
953 &self,
954 room_id: Option<OwnedRoomId>,
955 ) -> ObservableEventHandler<(Ev, Ctx)>
956 where
957 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
958 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
959 {
960 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
961 // new value.
962 let shared_observable = SharedObservable::new(None);
963
964 ObservableEventHandler::new(
965 shared_observable.clone(),
966 self.event_handler_drop_guard(self.add_event_handler_impl(
967 move |event: Ev, context: Ctx| {
968 shared_observable.set(Some((event, context)));
969
970 ready(())
971 },
972 room_id,
973 )),
974 )
975 }
976
977 /// Remove the event handler associated with the handle.
978 ///
979 /// Note that you **must not** call `remove_event_handler` from the
980 /// non-async part of an event handler, that is:
981 ///
982 /// ```ignore
983 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
984 /// // âš this will cause a deadlock âš
985 /// client.remove_event_handler(handle);
986 ///
987 /// async move {
988 /// // removing the event handler here is fine
989 /// client.remove_event_handler(handle);
990 /// }
991 /// })
992 /// ```
993 ///
994 /// Note also that handlers that remove themselves will still execute with
995 /// events received in the same sync cycle.
996 ///
997 /// # Arguments
998 ///
999 /// `handle` - The [`EventHandlerHandle`] that is returned when
1000 /// registering the event handler with [`Client::add_event_handler`].
1001 ///
1002 /// # Examples
1003 ///
1004 /// ```no_run
1005 /// # use url::Url;
1006 /// # use tokio::sync::mpsc;
1007 /// #
1008 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1009 /// #
1010 /// use matrix_sdk::{
1011 /// event_handler::EventHandlerHandle,
1012 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
1013 /// };
1014 /// #
1015 /// # futures_executor::block_on(async {
1016 /// # let client = matrix_sdk::Client::builder()
1017 /// # .homeserver_url(homeserver)
1018 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1019 /// # .build()
1020 /// # .await
1021 /// # .unwrap();
1022 ///
1023 /// client.add_event_handler(
1024 /// |ev: SyncRoomMemberEvent,
1025 /// client: Client,
1026 /// handle: EventHandlerHandle| async move {
1027 /// // Common usage: Check arriving Event is the expected one
1028 /// println!("Expected RoomMemberEvent received!");
1029 /// client.remove_event_handler(handle);
1030 /// },
1031 /// );
1032 /// # });
1033 /// ```
1034 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1035 self.inner.event_handlers.remove(handle);
1036 }
1037
1038 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1039 /// the given handle.
1040 ///
1041 /// When the returned value is dropped, the event handler will be removed.
1042 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1043 EventHandlerDropGuard::new(handle, self.clone())
1044 }
1045
1046 /// Add an arbitrary value for use as event handler context.
1047 ///
1048 /// The value can be obtained in an event handler by adding an argument of
1049 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1050 ///
1051 /// If a value of the same type has been added before, it will be
1052 /// overwritten.
1053 ///
1054 /// # Examples
1055 ///
1056 /// ```no_run
1057 /// use matrix_sdk::{
1058 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1059 /// Room,
1060 /// };
1061 /// # #[derive(Clone)]
1062 /// # struct SomeType;
1063 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1064 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1065 /// # futures_executor::block_on(async {
1066 /// # let client = matrix_sdk::Client::builder()
1067 /// # .homeserver_url(homeserver)
1068 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1069 /// # .build()
1070 /// # .await
1071 /// # .unwrap();
1072 ///
1073 /// // Handle used to send messages to the UI part of the app
1074 /// let my_gui_handle: SomeType = obtain_gui_handle();
1075 ///
1076 /// client.add_event_handler_context(my_gui_handle.clone());
1077 /// client.add_event_handler(
1078 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1079 /// async move {
1080 /// // gui_handle.send(DisplayMessage { message: ev });
1081 /// }
1082 /// },
1083 /// );
1084 /// # });
1085 /// ```
1086 pub fn add_event_handler_context<T>(&self, ctx: T)
1087 where
1088 T: Clone + Send + Sync + 'static,
1089 {
1090 self.inner.event_handlers.add_context(ctx);
1091 }
1092
1093 /// Register a handler for a notification.
1094 ///
1095 /// Similar to [`Client::add_event_handler`], but only allows functions
1096 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1097 /// [`Client`] for now.
1098 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1099 where
1100 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1101 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1102 {
1103 self.inner.notification_handlers.write().await.push(Box::new(
1104 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1105 ));
1106
1107 self
1108 }
1109
1110 /// Subscribe to all updates for the room with the given ID.
1111 ///
1112 /// The returned receiver will receive a new message for each sync response
1113 /// that contains updates for that room.
1114 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1115 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1116 btree_map::Entry::Vacant(entry) => {
1117 let (tx, rx) = broadcast::channel(8);
1118 entry.insert(tx);
1119 rx
1120 }
1121 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1122 }
1123 }
1124
1125 /// Subscribe to all updates to all rooms, whenever any has been received in
1126 /// a sync response.
1127 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1128 self.inner.room_updates_sender.subscribe()
1129 }
1130
1131 pub(crate) async fn notification_handlers(
1132 &self,
1133 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1134 self.inner.notification_handlers.read().await
1135 }
1136
1137 /// Get all the rooms the client knows about.
1138 ///
1139 /// This will return the list of joined, invited, and left rooms.
1140 pub fn rooms(&self) -> Vec<Room> {
1141 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1142 }
1143
1144 /// Get all the rooms the client knows about, filtered by room state.
1145 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1146 self.base_client()
1147 .rooms_filtered(filter)
1148 .into_iter()
1149 .map(|room| Room::new(self.clone(), room))
1150 .collect()
1151 }
1152
1153 /// Get a stream of all the rooms, in addition to the existing rooms.
1154 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1155 let (rooms, stream) = self.base_client().rooms_stream();
1156
1157 let map_room = |room| Room::new(self.clone(), room);
1158
1159 (
1160 rooms.into_iter().map(map_room).collect(),
1161 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1162 )
1163 }
1164
1165 /// Returns the joined rooms this client knows about.
1166 pub fn joined_rooms(&self) -> Vec<Room> {
1167 self.base_client()
1168 .rooms_filtered(RoomStateFilter::JOINED)
1169 .into_iter()
1170 .map(|room| Room::new(self.clone(), room))
1171 .collect()
1172 }
1173
1174 /// Returns the invited rooms this client knows about.
1175 pub fn invited_rooms(&self) -> Vec<Room> {
1176 self.base_client()
1177 .rooms_filtered(RoomStateFilter::INVITED)
1178 .into_iter()
1179 .map(|room| Room::new(self.clone(), room))
1180 .collect()
1181 }
1182
1183 /// Returns the left rooms this client knows about.
1184 pub fn left_rooms(&self) -> Vec<Room> {
1185 self.base_client()
1186 .rooms_filtered(RoomStateFilter::LEFT)
1187 .into_iter()
1188 .map(|room| Room::new(self.clone(), room))
1189 .collect()
1190 }
1191
1192 /// Get a room with the given room id.
1193 ///
1194 /// # Arguments
1195 ///
1196 /// `room_id` - The unique id of the room that should be fetched.
1197 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1198 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1199 }
1200
1201 /// Gets the preview of a room, whether the current user has joined it or
1202 /// not.
1203 pub async fn get_room_preview(
1204 &self,
1205 room_or_alias_id: &RoomOrAliasId,
1206 via: Vec<OwnedServerName>,
1207 ) -> Result<RoomPreview> {
1208 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1209 Ok(room_id) => room_id.to_owned(),
1210 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1211 };
1212
1213 if let Some(room) = self.get_room(&room_id) {
1214 // The cached data can only be trusted if the room state is joined or
1215 // banned: for invite and knock rooms, no updates will be received
1216 // for the rooms after the invite/knock action took place so we may
1217 // have very out to date data for important fields such as
1218 // `join_rule`. For left rooms, the homeserver should return the latest info.
1219 match room.state() {
1220 RoomState::Joined | RoomState::Banned => {
1221 return Ok(RoomPreview::from_known_room(&room).await);
1222 }
1223 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1224 }
1225 }
1226
1227 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1228 }
1229
1230 /// Resolve a room alias to a room id and a list of servers which know
1231 /// about it.
1232 ///
1233 /// # Arguments
1234 ///
1235 /// `room_alias` - The room alias to be resolved.
1236 pub async fn resolve_room_alias(
1237 &self,
1238 room_alias: &RoomAliasId,
1239 ) -> HttpResult<get_alias::v3::Response> {
1240 let request = get_alias::v3::Request::new(room_alias.to_owned());
1241 self.send(request).await
1242 }
1243
1244 /// Checks if a room alias is not in use yet.
1245 ///
1246 /// Returns:
1247 /// - `Ok(true)` if the room alias is available.
1248 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1249 /// status code).
1250 /// - An `Err` otherwise.
1251 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1252 match self.resolve_room_alias(alias).await {
1253 // The room alias was resolved, so it's already in use.
1254 Ok(_) => Ok(false),
1255 Err(error) => {
1256 match error.client_api_error_kind() {
1257 // The room alias wasn't found, so it's available.
1258 Some(ErrorKind::NotFound) => Ok(true),
1259 _ => Err(error),
1260 }
1261 }
1262 }
1263 }
1264
1265 /// Adds a new room alias associated with a room to the room directory.
1266 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1267 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1268 self.send(request).await?;
1269 Ok(())
1270 }
1271
1272 /// Removes a room alias from the room directory.
1273 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1274 let request = delete_alias::v3::Request::new(alias.to_owned());
1275 self.send(request).await?;
1276 Ok(())
1277 }
1278
1279 /// Update the homeserver from the login response well-known if needed.
1280 ///
1281 /// # Arguments
1282 ///
1283 /// * `login_well_known` - The `well_known` field from a successful login
1284 /// response.
1285 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1286 if self.inner.respect_login_well_known {
1287 if let Some(well_known) = login_well_known {
1288 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1289 self.set_homeserver(homeserver);
1290 }
1291 }
1292 }
1293 }
1294
1295 /// Similar to [`Client::restore_session_with`], with
1296 /// [`RoomLoadSettings::default()`].
1297 ///
1298 /// # Panics
1299 ///
1300 /// Panics if a session was already restored or logged in.
1301 #[instrument(skip_all)]
1302 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1303 self.restore_session_with(session, RoomLoadSettings::default()).await
1304 }
1305
1306 /// Restore a session previously logged-in using one of the available
1307 /// authentication APIs. The number of rooms to restore is controlled by
1308 /// [`RoomLoadSettings`].
1309 ///
1310 /// See the documentation of the corresponding authentication API's
1311 /// `restore_session` method for more information.
1312 ///
1313 /// # Panics
1314 ///
1315 /// Panics if a session was already restored or logged in.
1316 #[instrument(skip_all)]
1317 pub async fn restore_session_with(
1318 &self,
1319 session: impl Into<AuthSession>,
1320 room_load_settings: RoomLoadSettings,
1321 ) -> Result<()> {
1322 let session = session.into();
1323 match session {
1324 AuthSession::Matrix(session) => {
1325 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1326 }
1327 AuthSession::OAuth(session) => {
1328 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1329 }
1330 }
1331 }
1332
1333 /// Refresh the access token using the authentication API used to log into
1334 /// this session.
1335 ///
1336 /// See the documentation of the authentication API's `refresh_access_token`
1337 /// method for more information.
1338 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1339 let Some(auth_api) = self.auth_api() else {
1340 return Err(RefreshTokenError::RefreshTokenRequired);
1341 };
1342
1343 match auth_api {
1344 AuthApi::Matrix(api) => {
1345 trace!("Token refresh: Using the homeserver.");
1346 Box::pin(api.refresh_access_token()).await?;
1347 }
1348 AuthApi::OAuth(api) => {
1349 trace!("Token refresh: Using OAuth 2.0.");
1350 Box::pin(api.refresh_access_token()).await?;
1351 }
1352 }
1353
1354 Ok(())
1355 }
1356
1357 /// Log out the current session using the proper authentication API.
1358 ///
1359 /// # Errors
1360 ///
1361 /// Returns an error if the session is not authenticated or if an error
1362 /// occurred while making the request to the server.
1363 pub async fn logout(&self) -> Result<(), Error> {
1364 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1365 match auth_api {
1366 AuthApi::Matrix(matrix_auth) => {
1367 matrix_auth.logout().await?;
1368 Ok(())
1369 }
1370 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1371 }
1372 }
1373
1374 /// Get or upload a sync filter.
1375 ///
1376 /// This method will either get a filter ID from the store or upload the
1377 /// filter definition to the homeserver and return the new filter ID.
1378 ///
1379 /// # Arguments
1380 ///
1381 /// * `filter_name` - The unique name of the filter, this name will be used
1382 /// locally to store and identify the filter ID returned by the server.
1383 ///
1384 /// * `definition` - The filter definition that should be uploaded to the
1385 /// server if no filter ID can be found in the store.
1386 ///
1387 /// # Examples
1388 ///
1389 /// ```no_run
1390 /// # use matrix_sdk::{
1391 /// # Client, config::SyncSettings,
1392 /// # ruma::api::client::{
1393 /// # filter::{
1394 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1395 /// # },
1396 /// # sync::sync_events::v3::Filter,
1397 /// # }
1398 /// # };
1399 /// # use url::Url;
1400 /// # async {
1401 /// # let homeserver = Url::parse("http://example.com").unwrap();
1402 /// # let client = Client::new(homeserver).await.unwrap();
1403 /// let mut filter = FilterDefinition::default();
1404 ///
1405 /// // Let's enable member lazy loading.
1406 /// filter.room.state.lazy_load_options =
1407 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1408 ///
1409 /// let filter_id = client
1410 /// .get_or_upload_filter("sync", filter)
1411 /// .await
1412 /// .unwrap();
1413 ///
1414 /// let sync_settings = SyncSettings::new()
1415 /// .filter(Filter::FilterId(filter_id));
1416 ///
1417 /// let response = client.sync_once(sync_settings).await.unwrap();
1418 /// # };
1419 #[instrument(skip(self, definition))]
1420 pub async fn get_or_upload_filter(
1421 &self,
1422 filter_name: &str,
1423 definition: FilterDefinition,
1424 ) -> Result<String> {
1425 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1426 debug!("Found filter locally");
1427 Ok(filter)
1428 } else {
1429 debug!("Didn't find filter locally");
1430 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1431 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1432 let response = self.send(request).await?;
1433
1434 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1435
1436 Ok(response.filter_id)
1437 }
1438 }
1439
1440 /// Prepare to join a room by ID, by getting the current details about it
1441 async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1442 let room = self.get_room(room_id)?;
1443
1444 let inviter = match room.invite_details().await {
1445 Ok(details) => details.inviter,
1446 Err(Error::WrongRoomState(_)) => None,
1447 Err(e) => {
1448 warn!("Error fetching invite details for room: {e:?}");
1449 None
1450 }
1451 };
1452
1453 Some(PreJoinRoomInfo { inviter })
1454 }
1455
1456 /// Finish joining a room.
1457 ///
1458 /// If the room was an invite that should be marked as a DM, will include it
1459 /// in the DM event after creating the joined room.
1460 ///
1461 /// If encrypted history sharing is enabled, will check to see if we have a
1462 /// key bundle, and import it if so.
1463 ///
1464 /// # Arguments
1465 ///
1466 /// * `room_id` - The `RoomId` of the room that was joined.
1467 /// * `pre_join_room_info` - Information about the room before we joined.
1468 async fn finish_join_room(
1469 &self,
1470 room_id: &RoomId,
1471 pre_join_room_info: Option<PreJoinRoomInfo>,
1472 ) -> Result<Room> {
1473 let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1474 room.state() == RoomState::Invited
1475 && room.is_direct().await.unwrap_or_else(|e| {
1476 warn!(%room_id, "is_direct() failed: {e}");
1477 false
1478 })
1479 } else {
1480 false
1481 };
1482
1483 let base_room = self.base_client().room_joined(room_id).await?;
1484 let room = Room::new(self.clone(), base_room);
1485
1486 if mark_as_dm {
1487 room.set_is_direct(true).await?;
1488 }
1489
1490 #[cfg(feature = "e2e-encryption")]
1491 if self.inner.enable_share_history_on_invite {
1492 if let Some(inviter) =
1493 pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1494 {
1495 crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1496 .await?;
1497 }
1498 }
1499
1500 // Suppress "unused variable" and "unused field" lints
1501 #[cfg(not(feature = "e2e-encryption"))]
1502 let _ = pre_join_room_info.map(|i| i.inviter);
1503
1504 Ok(room)
1505 }
1506
1507 /// Join a room by `RoomId`.
1508 ///
1509 /// Returns the `Room` in the joined state.
1510 ///
1511 /// # Arguments
1512 ///
1513 /// * `room_id` - The `RoomId` of the room to be joined.
1514 #[instrument(skip(self))]
1515 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1516 // See who invited us to this room, if anyone. Note we have to do this before
1517 // making the `/join` request, otherwise we could race against the sync.
1518 let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1519
1520 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1521 let response = self.send(request).await?;
1522 self.finish_join_room(&response.room_id, pre_join_info).await
1523 }
1524
1525 /// Join a room by `RoomOrAliasId`.
1526 ///
1527 /// Returns the `Room` in the joined state.
1528 ///
1529 /// # Arguments
1530 ///
1531 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1532 /// alias looks like `#name:example.com`.
1533 /// * `server_names` - The server names to be used for resolving the alias,
1534 /// if needs be.
1535 pub async fn join_room_by_id_or_alias(
1536 &self,
1537 alias: &RoomOrAliasId,
1538 server_names: &[OwnedServerName],
1539 ) -> Result<Room> {
1540 let pre_join_info = {
1541 match alias.try_into() {
1542 Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1543 Err(_) => {
1544 // The id is a room alias. We assume (possibly incorrectly?) that we are not
1545 // responding to an invitation to the room, and therefore don't need to handle
1546 // things that happen as a result of invites.
1547 None
1548 }
1549 }
1550 };
1551 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1552 via: server_names.to_owned(),
1553 });
1554 let response = self.send(request).await?;
1555 self.finish_join_room(&response.room_id, pre_join_info).await
1556 }
1557
1558 /// Search the homeserver's directory of public rooms.
1559 ///
1560 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1561 /// a `get_public_rooms::Response`.
1562 ///
1563 /// # Arguments
1564 ///
1565 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1566 ///
1567 /// * `since` - Pagination token from a previous request.
1568 ///
1569 /// * `server` - The name of the server, if `None` the requested server is
1570 /// used.
1571 ///
1572 /// # Examples
1573 /// ```no_run
1574 /// use matrix_sdk::Client;
1575 /// # use url::Url;
1576 /// # let homeserver = Url::parse("http://example.com").unwrap();
1577 /// # let limit = Some(10);
1578 /// # let since = Some("since token");
1579 /// # let server = Some("servername.com".try_into().unwrap());
1580 /// # async {
1581 /// let mut client = Client::new(homeserver).await.unwrap();
1582 ///
1583 /// client.public_rooms(limit, since, server).await;
1584 /// # };
1585 /// ```
1586 #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1587 pub async fn public_rooms(
1588 &self,
1589 limit: Option<u32>,
1590 since: Option<&str>,
1591 server: Option<&ServerName>,
1592 ) -> HttpResult<get_public_rooms::v3::Response> {
1593 let limit = limit.map(UInt::from);
1594
1595 let request = assign!(get_public_rooms::v3::Request::new(), {
1596 limit,
1597 since: since.map(ToOwned::to_owned),
1598 server: server.map(ToOwned::to_owned),
1599 });
1600 self.send(request).await
1601 }
1602
1603 /// Create a room with the given parameters.
1604 ///
1605 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1606 /// created room.
1607 ///
1608 /// If you want to create a direct message with one specific user, you can
1609 /// use [`create_dm`][Self::create_dm], which is more convenient than
1610 /// assembling the [`create_room::v3::Request`] yourself.
1611 ///
1612 /// If the `is_direct` field of the request is set to `true` and at least
1613 /// one user is invited, the room will be automatically added to the direct
1614 /// rooms in the account data.
1615 ///
1616 /// # Examples
1617 ///
1618 /// ```no_run
1619 /// use matrix_sdk::{
1620 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1621 /// Client,
1622 /// };
1623 /// # use url::Url;
1624 /// #
1625 /// # async {
1626 /// # let homeserver = Url::parse("http://example.com").unwrap();
1627 /// let request = CreateRoomRequest::new();
1628 /// let client = Client::new(homeserver).await.unwrap();
1629 /// assert!(client.create_room(request).await.is_ok());
1630 /// # };
1631 /// ```
1632 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1633 let invite = request.invite.clone();
1634 let is_direct_room = request.is_direct;
1635 let response = self.send(request).await?;
1636
1637 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1638
1639 let joined_room = Room::new(self.clone(), base_room);
1640
1641 if is_direct_room && !invite.is_empty() {
1642 if let Err(error) =
1643 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1644 {
1645 // FIXME: Retry in the background
1646 error!("Failed to mark room as DM: {error}");
1647 }
1648 }
1649
1650 Ok(joined_room)
1651 }
1652
1653 /// Create a DM room.
1654 ///
1655 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1656 /// given user being invited, the room marked `is_direct` and both the
1657 /// creator and invitee getting the default maximum power level.
1658 ///
1659 /// If the `e2e-encryption` feature is enabled, the room will also be
1660 /// encrypted.
1661 ///
1662 /// # Arguments
1663 ///
1664 /// * `user_id` - The ID of the user to create a DM for.
1665 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1666 #[cfg(feature = "e2e-encryption")]
1667 let initial_state =
1668 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1669 .to_raw_any()];
1670
1671 #[cfg(not(feature = "e2e-encryption"))]
1672 let initial_state = vec![];
1673
1674 let request = assign!(create_room::v3::Request::new(), {
1675 invite: vec![user_id.to_owned()],
1676 is_direct: true,
1677 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1678 initial_state,
1679 });
1680
1681 self.create_room(request).await
1682 }
1683
1684 /// Search the homeserver's directory for public rooms with a filter.
1685 ///
1686 /// # Arguments
1687 ///
1688 /// * `room_search` - The easiest way to create this request is using the
1689 /// `get_public_rooms_filtered::Request` itself.
1690 ///
1691 /// # Examples
1692 ///
1693 /// ```no_run
1694 /// # use url::Url;
1695 /// # use matrix_sdk::Client;
1696 /// # async {
1697 /// # let homeserver = Url::parse("http://example.com")?;
1698 /// use matrix_sdk::ruma::{
1699 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1700 /// };
1701 /// # let mut client = Client::new(homeserver).await?;
1702 ///
1703 /// let mut filter = Filter::new();
1704 /// filter.generic_search_term = Some("rust".to_owned());
1705 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1706 /// request.filter = filter;
1707 ///
1708 /// let response = client.public_rooms_filtered(request).await?;
1709 ///
1710 /// for room in response.chunk {
1711 /// println!("Found room {room:?}");
1712 /// }
1713 /// # anyhow::Ok(()) };
1714 /// ```
1715 pub async fn public_rooms_filtered(
1716 &self,
1717 request: get_public_rooms_filtered::v3::Request,
1718 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1719 self.send(request).await
1720 }
1721
1722 /// Send an arbitrary request to the server, without updating client state.
1723 ///
1724 /// **Warning:** Because this method *does not* update the client state, it
1725 /// is important to make sure that you account for this yourself, and
1726 /// use wrapper methods where available. This method should *only* be
1727 /// used if a wrapper method for the endpoint you'd like to use is not
1728 /// available.
1729 ///
1730 /// # Arguments
1731 ///
1732 /// * `request` - A filled out and valid request for the endpoint to be hit
1733 ///
1734 /// * `timeout` - An optional request timeout setting, this overrides the
1735 /// default request setting if one was set.
1736 ///
1737 /// # Examples
1738 ///
1739 /// ```no_run
1740 /// # use matrix_sdk::{Client, config::SyncSettings};
1741 /// # use url::Url;
1742 /// # async {
1743 /// # let homeserver = Url::parse("http://localhost:8080")?;
1744 /// # let mut client = Client::new(homeserver).await?;
1745 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1746 ///
1747 /// // First construct the request you want to make
1748 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1749 /// // for all available Endpoints
1750 /// let user_id = user_id!("@example:localhost").to_owned();
1751 /// let request = profile::get_profile::v3::Request::new(user_id);
1752 ///
1753 /// // Start the request using Client::send()
1754 /// let response = client.send(request).await?;
1755 ///
1756 /// // Check the corresponding Response struct to find out what types are
1757 /// // returned
1758 /// # anyhow::Ok(()) };
1759 /// ```
1760 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1761 where
1762 Request: OutgoingRequest + Clone + Debug,
1763 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1764 {
1765 SendRequest {
1766 client: self.clone(),
1767 request,
1768 config: None,
1769 send_progress: Default::default(),
1770 }
1771 }
1772
1773 pub(crate) async fn send_inner<Request>(
1774 &self,
1775 request: Request,
1776 config: Option<RequestConfig>,
1777 send_progress: SharedObservable<TransmissionProgress>,
1778 ) -> HttpResult<Request::IncomingResponse>
1779 where
1780 Request: OutgoingRequest + Debug,
1781 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1782 {
1783 let homeserver = self.homeserver().to_string();
1784 let access_token = self.access_token();
1785
1786 self.inner
1787 .http_client
1788 .send(
1789 request,
1790 config,
1791 homeserver,
1792 access_token.as_deref(),
1793 &self.server_versions().await?,
1794 send_progress,
1795 )
1796 .await
1797 }
1798
1799 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1800 _ = self
1801 .inner
1802 .auth_ctx
1803 .session_change_sender
1804 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1805 }
1806
1807 /// Fetches server versions from network; no caching.
1808 pub async fn fetch_server_versions(
1809 &self,
1810 request_config: Option<RequestConfig>,
1811 ) -> HttpResult<get_supported_versions::Response> {
1812 let server_versions = self
1813 .inner
1814 .http_client
1815 .send(
1816 get_supported_versions::Request::new(),
1817 request_config,
1818 self.homeserver().to_string(),
1819 None,
1820 &[MatrixVersion::V1_0],
1821 Default::default(),
1822 )
1823 .await?;
1824
1825 Ok(server_versions)
1826 }
1827
1828 /// Fetches client well_known from network; no caching.
1829 pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1830 let server_url_string = self
1831 .server()
1832 .unwrap_or(
1833 // Sometimes people configure their well-known directly on the homeserver so use
1834 // this as a fallback when the server name is unknown.
1835 &self.homeserver(),
1836 )
1837 .to_string();
1838
1839 let well_known = self
1840 .inner
1841 .http_client
1842 .send(
1843 discover_homeserver::Request::new(),
1844 Some(RequestConfig::short_retry()),
1845 server_url_string,
1846 None,
1847 &[MatrixVersion::V1_0],
1848 Default::default(),
1849 )
1850 .await;
1851
1852 match well_known {
1853 Ok(well_known) => Some(well_known),
1854 Err(http_error) => {
1855 // It is perfectly valid to not have a well-known file.
1856 // Maybe we should check for a specific error code to be sure?
1857 warn!("Failed to fetch client well-known: {http_error}");
1858 None
1859 }
1860 }
1861 }
1862
1863 /// Load server info from storage, or fetch them from network and cache
1864 /// them.
1865 async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1866 match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1867 Ok(Some(stored)) => {
1868 if let Some(server_info) =
1869 stored.into_server_info().and_then(|info| info.maybe_decode())
1870 {
1871 return Ok(server_info);
1872 }
1873 }
1874 Ok(None) => {
1875 // fallthrough: cache is empty
1876 }
1877 Err(err) => {
1878 warn!("error when loading cached server info: {err}");
1879 // fallthrough to network.
1880 }
1881 }
1882
1883 let server_versions = self.fetch_server_versions(None).await?;
1884 let well_known = self.fetch_client_well_known().await;
1885 let server_info = ServerInfo::new(
1886 server_versions.versions.clone(),
1887 server_versions.unstable_features.clone(),
1888 well_known.map(Into::into),
1889 );
1890
1891 // Attempt to cache the result in storage.
1892 {
1893 if let Err(err) = self
1894 .state_store()
1895 .set_kv_data(
1896 StateStoreDataKey::ServerInfo,
1897 StateStoreDataValue::ServerInfo(server_info.clone()),
1898 )
1899 .await
1900 {
1901 warn!("error when caching server info: {err}");
1902 }
1903 }
1904
1905 Ok(server_info)
1906 }
1907
1908 async fn get_or_load_and_cache_server_info<
1909 Value,
1910 MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1911 >(
1912 &self,
1913 map: MapFunction,
1914 ) -> HttpResult<Value> {
1915 let server_info = &self.inner.caches.server_info;
1916 if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1917 return Ok(val);
1918 }
1919
1920 let mut guarded_server_info = server_info.write().await;
1921 if let CachedValue::Cached(val) = map(&guarded_server_info) {
1922 return Ok(val);
1923 }
1924
1925 let server_info = self.load_or_fetch_server_info().await?;
1926
1927 // Fill both unstable features and server versions at once.
1928 let mut supported = server_info.supported_versions();
1929 if supported.versions.is_empty() {
1930 supported.versions = [MatrixVersion::V1_0].into();
1931 }
1932
1933 guarded_server_info.supported_versions = CachedValue::Cached(supported);
1934 guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
1935
1936 // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
1937 // fetch an optional property), the function will always return some.
1938 Ok(map(&guarded_server_info).unwrap_cached_value())
1939 }
1940
1941 /// Get the Matrix versions and features supported by the homeserver by
1942 /// fetching them from the server or the cache.
1943 ///
1944 /// This is equivalent to calling both [`Client::server_versions()`] and
1945 /// [`Client::unstable_features()`]. To always fetch the result from the
1946 /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
1947 /// and then `.as_supported_versions()` on the response.
1948 ///
1949 /// # Examples
1950 ///
1951 /// ```no_run
1952 /// use ruma::api::{FeatureFlag, MatrixVersion};
1953 /// # use matrix_sdk::{Client, config::SyncSettings};
1954 /// # use url::Url;
1955 /// # async {
1956 /// # let homeserver = Url::parse("http://localhost:8080")?;
1957 /// # let mut client = Client::new(homeserver).await?;
1958 ///
1959 /// let supported = client.supported_versions().await?;
1960 /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
1961 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1962 ///
1963 /// let msc_x_feature = FeatureFlag::from("msc_x");
1964 /// let supports_msc_x = supported.features.contains(&msc_x_feature);
1965 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1966 /// # anyhow::Ok(()) };
1967 /// ```
1968 pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
1969 self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
1970 .await
1971 }
1972
1973 /// Get the Matrix versions supported by the homeserver by fetching them
1974 /// from the server or the cache.
1975 ///
1976 /// # Examples
1977 ///
1978 /// ```no_run
1979 /// use ruma::api::MatrixVersion;
1980 /// # use matrix_sdk::{Client, config::SyncSettings};
1981 /// # use url::Url;
1982 /// # async {
1983 /// # let homeserver = Url::parse("http://localhost:8080")?;
1984 /// # let mut client = Client::new(homeserver).await?;
1985 ///
1986 /// let server_versions = client.server_versions().await?;
1987 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1988 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1989 /// # anyhow::Ok(()) };
1990 /// ```
1991 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1992 self.get_or_load_and_cache_server_info(|server_info| {
1993 server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
1994 })
1995 .await
1996 }
1997
1998 /// Get the unstable features supported by the homeserver by fetching them
1999 /// from the server or the cache.
2000 ///
2001 /// # Examples
2002 ///
2003 /// ```no_run
2004 /// use matrix_sdk::ruma::api::FeatureFlag;
2005 /// # use matrix_sdk::{Client, config::SyncSettings};
2006 /// # use url::Url;
2007 /// # async {
2008 /// # let homeserver = Url::parse("http://localhost:8080")?;
2009 /// # let mut client = Client::new(homeserver).await?;
2010 ///
2011 /// let msc_x_feature = FeatureFlag::from("msc_x");
2012 /// let unstable_features = client.unstable_features().await?;
2013 /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2014 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2015 /// # anyhow::Ok(()) };
2016 /// ```
2017 pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2018 self.get_or_load_and_cache_server_info(|server_info| {
2019 server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2020 })
2021 .await
2022 }
2023
2024 /// Get information about the homeserver's advertised RTC foci by fetching
2025 /// the well-known file from the server or the cache.
2026 ///
2027 /// # Examples
2028 /// ```no_run
2029 /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2030 /// # use url::Url;
2031 /// # async {
2032 /// # let homeserver = Url::parse("http://localhost:8080")?;
2033 /// # let mut client = Client::new(homeserver).await?;
2034 /// let rtc_foci = client.rtc_foci().await?;
2035 /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2036 /// RtcFocusInfo::LiveKit(info) => Some(info),
2037 /// _ => None,
2038 /// });
2039 /// if let Some(info) = default_livekit_focus_info {
2040 /// println!("Default LiveKit service URL: {}", info.service_url);
2041 /// }
2042 /// # anyhow::Ok(()) };
2043 /// ```
2044 pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2045 let well_known = self
2046 .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2047 .await?;
2048
2049 Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2050 }
2051
2052 /// Empty the server version and unstable features cache.
2053 ///
2054 /// Since the SDK caches server info (versions, unstable features,
2055 /// well-known etc), it's possible to have a stale entry in the cache. This
2056 /// functions makes it possible to force reset it.
2057 pub async fn reset_server_info(&self) -> Result<()> {
2058 // Empty the in-memory caches.
2059 let mut guard = self.inner.caches.server_info.write().await;
2060 guard.supported_versions = CachedValue::NotSet;
2061
2062 // Empty the store cache.
2063 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2064 }
2065
2066 /// Check whether MSC 4028 is enabled on the homeserver.
2067 ///
2068 /// # Examples
2069 ///
2070 /// ```no_run
2071 /// # use matrix_sdk::{Client, config::SyncSettings};
2072 /// # use url::Url;
2073 /// # async {
2074 /// # let homeserver = Url::parse("http://localhost:8080")?;
2075 /// # let mut client = Client::new(homeserver).await?;
2076 /// let msc4028_enabled =
2077 /// client.can_homeserver_push_encrypted_event_to_device().await?;
2078 /// # anyhow::Ok(()) };
2079 /// ```
2080 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2081 Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2082 }
2083
2084 /// Get information of all our own devices.
2085 ///
2086 /// # Examples
2087 ///
2088 /// ```no_run
2089 /// # use matrix_sdk::{Client, config::SyncSettings};
2090 /// # use url::Url;
2091 /// # async {
2092 /// # let homeserver = Url::parse("http://localhost:8080")?;
2093 /// # let mut client = Client::new(homeserver).await?;
2094 /// let response = client.devices().await?;
2095 ///
2096 /// for device in response.devices {
2097 /// println!(
2098 /// "Device: {} {}",
2099 /// device.device_id,
2100 /// device.display_name.as_deref().unwrap_or("")
2101 /// );
2102 /// }
2103 /// # anyhow::Ok(()) };
2104 /// ```
2105 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2106 let request = get_devices::v3::Request::new();
2107
2108 self.send(request).await
2109 }
2110
2111 /// Delete the given devices from the server.
2112 ///
2113 /// # Arguments
2114 ///
2115 /// * `devices` - The list of devices that should be deleted from the
2116 /// server.
2117 ///
2118 /// * `auth_data` - This request requires user interactive auth, the first
2119 /// request needs to set this to `None` and will always fail with an
2120 /// `UiaaResponse`. The response will contain information for the
2121 /// interactive auth and the same request needs to be made but this time
2122 /// with some `auth_data` provided.
2123 ///
2124 /// ```no_run
2125 /// # use matrix_sdk::{
2126 /// # ruma::{api::client::uiaa, device_id},
2127 /// # Client, Error, config::SyncSettings,
2128 /// # };
2129 /// # use serde_json::json;
2130 /// # use url::Url;
2131 /// # use std::collections::BTreeMap;
2132 /// # async {
2133 /// # let homeserver = Url::parse("http://localhost:8080")?;
2134 /// # let mut client = Client::new(homeserver).await?;
2135 /// let devices = &[device_id!("DEVICEID").to_owned()];
2136 ///
2137 /// if let Err(e) = client.delete_devices(devices, None).await {
2138 /// if let Some(info) = e.as_uiaa_response() {
2139 /// let mut password = uiaa::Password::new(
2140 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2141 /// "wordpass".to_owned(),
2142 /// );
2143 /// password.session = info.session.clone();
2144 ///
2145 /// client
2146 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2147 /// .await?;
2148 /// }
2149 /// }
2150 /// # anyhow::Ok(()) };
2151 pub async fn delete_devices(
2152 &self,
2153 devices: &[OwnedDeviceId],
2154 auth_data: Option<uiaa::AuthData>,
2155 ) -> HttpResult<delete_devices::v3::Response> {
2156 let mut request = delete_devices::v3::Request::new(devices.to_owned());
2157 request.auth = auth_data;
2158
2159 self.send(request).await
2160 }
2161
2162 /// Change the display name of a device owned by the current user.
2163 ///
2164 /// Returns a `update_device::Response` which specifies the result
2165 /// of the operation.
2166 ///
2167 /// # Arguments
2168 ///
2169 /// * `device_id` - The ID of the device to change the display name of.
2170 /// * `display_name` - The new display name to set.
2171 pub async fn rename_device(
2172 &self,
2173 device_id: &DeviceId,
2174 display_name: &str,
2175 ) -> HttpResult<update_device::v3::Response> {
2176 let mut request = update_device::v3::Request::new(device_id.to_owned());
2177 request.display_name = Some(display_name.to_owned());
2178
2179 self.send(request).await
2180 }
2181
2182 /// Synchronize the client's state with the latest state on the server.
2183 ///
2184 /// ## Syncing Events
2185 ///
2186 /// Messages or any other type of event need to be periodically fetched from
2187 /// the server, this is achieved by sending a `/sync` request to the server.
2188 ///
2189 /// The first sync is sent out without a [`token`]. The response of the
2190 /// first sync will contain a [`next_batch`] field which should then be
2191 /// used in the subsequent sync calls as the [`token`]. This ensures that we
2192 /// don't receive the same events multiple times.
2193 ///
2194 /// ## Long Polling
2195 ///
2196 /// A sync should in the usual case always be in flight. The
2197 /// [`SyncSettings`] have a [`timeout`] option, which controls how
2198 /// long the server will wait for new events before it will respond.
2199 /// The server will respond immediately if some new events arrive before the
2200 /// timeout has expired. If no changes arrive and the timeout expires an
2201 /// empty sync response will be sent to the client.
2202 ///
2203 /// This method of sending a request that may not receive a response
2204 /// immediately is called long polling.
2205 ///
2206 /// ## Filtering Events
2207 ///
2208 /// The number or type of messages and events that the client should receive
2209 /// from the server can be altered using a [`Filter`].
2210 ///
2211 /// Filters can be non-trivial and, since they will be sent with every sync
2212 /// request, they may take up a bunch of unnecessary bandwidth.
2213 ///
2214 /// Luckily filters can be uploaded to the server and reused using an unique
2215 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2216 /// method.
2217 ///
2218 /// # Arguments
2219 ///
2220 /// * `sync_settings` - Settings for the sync call, this allows us to set
2221 /// various options to configure the sync:
2222 /// * [`filter`] - To configure which events we receive and which get
2223 /// [filtered] by the server
2224 /// * [`timeout`] - To configure our [long polling] setup.
2225 /// * [`token`] - To tell the server which events we already received
2226 /// and where we wish to continue syncing.
2227 /// * [`full_state`] - To tell the server that we wish to receive all
2228 /// state events, regardless of our configured [`token`].
2229 /// * [`set_presence`] - To tell the server to set the presence and to
2230 /// which state.
2231 ///
2232 /// # Examples
2233 ///
2234 /// ```no_run
2235 /// # use url::Url;
2236 /// # async {
2237 /// # let homeserver = Url::parse("http://localhost:8080")?;
2238 /// # let username = "";
2239 /// # let password = "";
2240 /// use matrix_sdk::{
2241 /// config::SyncSettings,
2242 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2243 /// };
2244 ///
2245 /// let client = Client::new(homeserver).await?;
2246 /// client.matrix_auth().login_username(username, password).send().await?;
2247 ///
2248 /// // Sync once so we receive the client state and old messages.
2249 /// client.sync_once(SyncSettings::default()).await?;
2250 ///
2251 /// // Register our handler so we start responding once we receive a new
2252 /// // event.
2253 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2254 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2255 /// });
2256 ///
2257 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2258 /// // from our `sync_once()` call automatically.
2259 /// client.sync(SyncSettings::default()).await;
2260 /// # anyhow::Ok(()) };
2261 /// ```
2262 ///
2263 /// [`sync`]: #method.sync
2264 /// [`SyncSettings`]: crate::config::SyncSettings
2265 /// [`token`]: crate::config::SyncSettings#method.token
2266 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2267 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2268 /// [`set_presence`]: ruma::presence::PresenceState
2269 /// [`filter`]: crate::config::SyncSettings#method.filter
2270 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2271 /// [`next_batch`]: SyncResponse#structfield.next_batch
2272 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2273 /// [long polling]: #long-polling
2274 /// [filtered]: #filtering-events
2275 #[instrument(skip(self))]
2276 pub async fn sync_once(
2277 &self,
2278 sync_settings: crate::config::SyncSettings,
2279 ) -> Result<SyncResponse> {
2280 // The sync might not return for quite a while due to the timeout.
2281 // We'll see if there's anything crypto related to send out before we
2282 // sync, i.e. if we closed our client after a sync but before the
2283 // crypto requests were sent out.
2284 //
2285 // This will mostly be a no-op.
2286 #[cfg(feature = "e2e-encryption")]
2287 if let Err(e) = self.send_outgoing_requests().await {
2288 error!(error = ?e, "Error while sending outgoing E2EE requests");
2289 }
2290
2291 let request = assign!(sync_events::v3::Request::new(), {
2292 filter: sync_settings.filter.map(|f| *f),
2293 since: sync_settings.token,
2294 full_state: sync_settings.full_state,
2295 set_presence: sync_settings.set_presence,
2296 timeout: sync_settings.timeout,
2297 });
2298 let mut request_config = self.request_config();
2299 if let Some(timeout) = sync_settings.timeout {
2300 request_config.timeout += timeout;
2301 }
2302
2303 let response = self.send(request).with_request_config(request_config).await?;
2304 let next_batch = response.next_batch.clone();
2305 let response = self.process_sync(response).await?;
2306
2307 #[cfg(feature = "e2e-encryption")]
2308 if let Err(e) = self.send_outgoing_requests().await {
2309 error!(error = ?e, "Error while sending outgoing E2EE requests");
2310 }
2311
2312 self.inner.sync_beat.notify(usize::MAX);
2313
2314 Ok(SyncResponse::new(next_batch, response))
2315 }
2316
2317 /// Repeatedly synchronize the client state with the server.
2318 ///
2319 /// This method will only return on error, if cancellation is needed
2320 /// the method should be wrapped in a cancelable task or the
2321 /// [`Client::sync_with_callback`] method can be used or
2322 /// [`Client::sync_with_result_callback`] if you want to handle error
2323 /// cases in the loop, too.
2324 ///
2325 /// This method will internally call [`Client::sync_once`] in a loop.
2326 ///
2327 /// This method can be used with the [`Client::add_event_handler`]
2328 /// method to react to individual events. If you instead wish to handle
2329 /// events in a bulk manner the [`Client::sync_with_callback`],
2330 /// [`Client::sync_with_result_callback`] and
2331 /// [`Client::sync_stream`] methods can be used instead. Those methods
2332 /// repeatedly return the whole sync response.
2333 ///
2334 /// # Arguments
2335 ///
2336 /// * `sync_settings` - Settings for the sync call. *Note* that those
2337 /// settings will be only used for the first sync call. See the argument
2338 /// docs for [`Client::sync_once`] for more info.
2339 ///
2340 /// # Return
2341 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2342 /// up to the user of the API to check the error and decide whether the sync
2343 /// should continue or not.
2344 ///
2345 /// # Examples
2346 ///
2347 /// ```no_run
2348 /// # use url::Url;
2349 /// # async {
2350 /// # let homeserver = Url::parse("http://localhost:8080")?;
2351 /// # let username = "";
2352 /// # let password = "";
2353 /// use matrix_sdk::{
2354 /// config::SyncSettings,
2355 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2356 /// };
2357 ///
2358 /// let client = Client::new(homeserver).await?;
2359 /// client.matrix_auth().login_username(&username, &password).send().await?;
2360 ///
2361 /// // Register our handler so we start responding once we receive a new
2362 /// // event.
2363 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2364 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2365 /// });
2366 ///
2367 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2368 /// // automatically.
2369 /// client.sync(SyncSettings::default()).await?;
2370 /// # anyhow::Ok(()) };
2371 /// ```
2372 ///
2373 /// [argument docs]: #method.sync_once
2374 /// [`sync_with_callback`]: #method.sync_with_callback
2375 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2376 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2377 }
2378
2379 /// Repeatedly call sync to synchronize the client state with the server.
2380 ///
2381 /// # Arguments
2382 ///
2383 /// * `sync_settings` - Settings for the sync call. *Note* that those
2384 /// settings will be only used for the first sync call. See the argument
2385 /// docs for [`Client::sync_once`] for more info.
2386 ///
2387 /// * `callback` - A callback that will be called every time a successful
2388 /// response has been fetched from the server. The callback must return a
2389 /// boolean which signalizes if the method should stop syncing. If the
2390 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2391 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2392 ///
2393 /// # Return
2394 /// The sync runs until an error occurs or the
2395 /// callback indicates that the Loop should stop. If the callback asked for
2396 /// a regular stop, the result will be `Ok(())` otherwise the
2397 /// `Err(Error)` is returned.
2398 ///
2399 /// # Examples
2400 ///
2401 /// The following example demonstrates how to sync forever while sending all
2402 /// the interesting events through a mpsc channel to another thread e.g. a
2403 /// UI thread.
2404 ///
2405 /// ```no_run
2406 /// # use std::time::Duration;
2407 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2408 /// # use url::Url;
2409 /// # async {
2410 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2411 /// # let mut client = Client::new(homeserver).await.unwrap();
2412 ///
2413 /// use tokio::sync::mpsc::channel;
2414 ///
2415 /// let (tx, rx) = channel(100);
2416 ///
2417 /// let sync_channel = &tx;
2418 /// let sync_settings = SyncSettings::new()
2419 /// .timeout(Duration::from_secs(30));
2420 ///
2421 /// client
2422 /// .sync_with_callback(sync_settings, |response| async move {
2423 /// let channel = sync_channel;
2424 /// for (room_id, room) in response.rooms.joined {
2425 /// for event in room.timeline.events {
2426 /// channel.send(event).await.unwrap();
2427 /// }
2428 /// }
2429 ///
2430 /// LoopCtrl::Continue
2431 /// })
2432 /// .await;
2433 /// };
2434 /// ```
2435 #[instrument(skip_all)]
2436 pub async fn sync_with_callback<C>(
2437 &self,
2438 sync_settings: crate::config::SyncSettings,
2439 callback: impl Fn(SyncResponse) -> C,
2440 ) -> Result<(), Error>
2441 where
2442 C: Future<Output = LoopCtrl>,
2443 {
2444 self.sync_with_result_callback(sync_settings, |result| async {
2445 Ok(callback(result?).await)
2446 })
2447 .await
2448 }
2449
2450 /// Repeatedly call sync to synchronize the client state with the server.
2451 ///
2452 /// # Arguments
2453 ///
2454 /// * `sync_settings` - Settings for the sync call. *Note* that those
2455 /// settings will be only used for the first sync call. See the argument
2456 /// docs for [`Client::sync_once`] for more info.
2457 ///
2458 /// * `callback` - A callback that will be called every time after a
2459 /// response has been received, failure or not. The callback returns a
2460 /// `Result<LoopCtrl, Error>`, too. When returning
2461 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2462 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2463 /// function returns `Ok(())`. In case the callback can't handle the
2464 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2465 /// which results in the sync ending and the `Err(Error)` being returned.
2466 ///
2467 /// # Return
2468 /// The sync runs until an error occurs that the callback can't handle or
2469 /// the callback indicates that the Loop should stop. If the callback
2470 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2471 /// `Err(Error)` is returned.
2472 ///
2473 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2474 /// this, and are handled first without sending the result to the
2475 /// callback. Only after they have exceeded is the `Result` handed to
2476 /// the callback.
2477 ///
2478 /// # Examples
2479 ///
2480 /// The following example demonstrates how to sync forever while sending all
2481 /// the interesting events through a mpsc channel to another thread e.g. a
2482 /// UI thread.
2483 ///
2484 /// ```no_run
2485 /// # use std::time::Duration;
2486 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2487 /// # use url::Url;
2488 /// # async {
2489 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2490 /// # let mut client = Client::new(homeserver).await.unwrap();
2491 /// #
2492 /// use tokio::sync::mpsc::channel;
2493 ///
2494 /// let (tx, rx) = channel(100);
2495 ///
2496 /// let sync_channel = &tx;
2497 /// let sync_settings = SyncSettings::new()
2498 /// .timeout(Duration::from_secs(30));
2499 ///
2500 /// client
2501 /// .sync_with_result_callback(sync_settings, |response| async move {
2502 /// let channel = sync_channel;
2503 /// let sync_response = response?;
2504 /// for (room_id, room) in sync_response.rooms.joined {
2505 /// for event in room.timeline.events {
2506 /// channel.send(event).await.unwrap();
2507 /// }
2508 /// }
2509 ///
2510 /// Ok(LoopCtrl::Continue)
2511 /// })
2512 /// .await;
2513 /// };
2514 /// ```
2515 #[instrument(skip(self, callback))]
2516 pub async fn sync_with_result_callback<C>(
2517 &self,
2518 mut sync_settings: crate::config::SyncSettings,
2519 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2520 ) -> Result<(), Error>
2521 where
2522 C: Future<Output = Result<LoopCtrl, Error>>,
2523 {
2524 let mut last_sync_time: Option<Instant> = None;
2525
2526 if sync_settings.token.is_none() {
2527 sync_settings.token = self.sync_token().await;
2528 }
2529
2530 loop {
2531 trace!("Syncing");
2532 let result = self.sync_loop_helper(&mut sync_settings).await;
2533
2534 trace!("Running callback");
2535 if callback(result).await? == LoopCtrl::Break {
2536 trace!("Callback told us to stop");
2537 break;
2538 }
2539 trace!("Done running callback");
2540
2541 Client::delay_sync(&mut last_sync_time).await
2542 }
2543
2544 Ok(())
2545 }
2546
2547 //// Repeatedly synchronize the client state with the server.
2548 ///
2549 /// This method will internally call [`Client::sync_once`] in a loop and is
2550 /// equivalent to the [`Client::sync`] method but the responses are provided
2551 /// as an async stream.
2552 ///
2553 /// # Arguments
2554 ///
2555 /// * `sync_settings` - Settings for the sync call. *Note* that those
2556 /// settings will be only used for the first sync call. See the argument
2557 /// docs for [`Client::sync_once`] for more info.
2558 ///
2559 /// # Examples
2560 ///
2561 /// ```no_run
2562 /// # use url::Url;
2563 /// # async {
2564 /// # let homeserver = Url::parse("http://localhost:8080")?;
2565 /// # let username = "";
2566 /// # let password = "";
2567 /// use futures_util::StreamExt;
2568 /// use matrix_sdk::{config::SyncSettings, Client};
2569 ///
2570 /// let client = Client::new(homeserver).await?;
2571 /// client.matrix_auth().login_username(&username, &password).send().await?;
2572 ///
2573 /// let mut sync_stream =
2574 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2575 ///
2576 /// while let Some(Ok(response)) = sync_stream.next().await {
2577 /// for room in response.rooms.joined.values() {
2578 /// for e in &room.timeline.events {
2579 /// if let Ok(event) = e.raw().deserialize() {
2580 /// println!("Received event {:?}", event);
2581 /// }
2582 /// }
2583 /// }
2584 /// }
2585 ///
2586 /// # anyhow::Ok(()) };
2587 /// ```
2588 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2589 #[instrument(skip(self))]
2590 pub async fn sync_stream(
2591 &self,
2592 mut sync_settings: crate::config::SyncSettings,
2593 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2594 let mut last_sync_time: Option<Instant> = None;
2595
2596 if sync_settings.token.is_none() {
2597 sync_settings.token = self.sync_token().await;
2598 }
2599
2600 let parent_span = Span::current();
2601
2602 async_stream::stream! {
2603 loop {
2604 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2605
2606 Client::delay_sync(&mut last_sync_time).await
2607 }
2608 }
2609 }
2610
2611 /// Get the current, if any, sync token of the client.
2612 /// This will be None if the client didn't sync at least once.
2613 pub(crate) async fn sync_token(&self) -> Option<String> {
2614 self.inner.base_client.sync_token().await
2615 }
2616
2617 /// Gets information about the owner of a given access token.
2618 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2619 let request = whoami::v3::Request::new();
2620 self.send(request).await
2621 }
2622
2623 /// Subscribes a new receiver to client SessionChange broadcasts.
2624 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2625 let broadcast = &self.auth_ctx().session_change_sender;
2626 broadcast.subscribe()
2627 }
2628
2629 /// Sets the save/restore session callbacks.
2630 ///
2631 /// This is another mechanism to get synchronous updates to session tokens,
2632 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2633 pub fn set_session_callbacks(
2634 &self,
2635 reload_session_callback: Box<ReloadSessionCallback>,
2636 save_session_callback: Box<SaveSessionCallback>,
2637 ) -> Result<()> {
2638 self.inner
2639 .auth_ctx
2640 .reload_session_callback
2641 .set(reload_session_callback)
2642 .map_err(|_| Error::MultipleSessionCallbacks)?;
2643
2644 self.inner
2645 .auth_ctx
2646 .save_session_callback
2647 .set(save_session_callback)
2648 .map_err(|_| Error::MultipleSessionCallbacks)?;
2649
2650 Ok(())
2651 }
2652
2653 /// Get the notification settings of the current owner of the client.
2654 pub async fn notification_settings(&self) -> NotificationSettings {
2655 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2656 NotificationSettings::new(self.clone(), ruleset)
2657 }
2658
2659 /// Create a new specialized `Client` that can process notifications.
2660 ///
2661 /// See [`CrossProcessStoreLock::new`] to learn more about
2662 /// `cross_process_store_locks_holder_name`.
2663 ///
2664 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2665 pub async fn notification_client(
2666 &self,
2667 cross_process_store_locks_holder_name: String,
2668 ) -> Result<Client> {
2669 let client = Client {
2670 inner: ClientInner::new(
2671 self.inner.auth_ctx.clone(),
2672 self.server().cloned(),
2673 self.homeserver(),
2674 self.sliding_sync_version(),
2675 self.inner.http_client.clone(),
2676 self.inner
2677 .base_client
2678 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2679 .await?,
2680 self.inner.caches.server_info.read().await.clone(),
2681 self.inner.respect_login_well_known,
2682 self.inner.event_cache.clone(),
2683 self.inner.send_queue_data.clone(),
2684 self.inner.latest_events.clone(),
2685 #[cfg(feature = "e2e-encryption")]
2686 self.inner.e2ee.encryption_settings,
2687 #[cfg(feature = "e2e-encryption")]
2688 self.inner.enable_share_history_on_invite,
2689 cross_process_store_locks_holder_name,
2690 )
2691 .await,
2692 };
2693
2694 Ok(client)
2695 }
2696
2697 /// The [`EventCache`] instance for this [`Client`].
2698 pub fn event_cache(&self) -> &EventCache {
2699 // SAFETY: always initialized in the `Client` ctor.
2700 self.inner.event_cache.get().unwrap()
2701 }
2702
2703 /// The [`LatestEvents`] instance for this [`Client`].
2704 pub async fn latest_events(&self) -> &LatestEvents {
2705 self.inner
2706 .latest_events
2707 .get_or_init(|| async {
2708 LatestEvents::new(
2709 WeakClient::from_client(self),
2710 self.event_cache().clone(),
2711 SendQueue::new(self.clone()),
2712 )
2713 })
2714 .await
2715 }
2716
2717 /// Waits until an at least partially synced room is received, and returns
2718 /// it.
2719 ///
2720 /// **Note: this function will loop endlessly until either it finds the room
2721 /// or an externally set timeout happens.**
2722 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2723 loop {
2724 if let Some(room) = self.get_room(room_id) {
2725 if room.is_state_partially_or_fully_synced() {
2726 debug!("Found just created room!");
2727 return room;
2728 }
2729 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2730 } else {
2731 debug!("Room wasn't found, waiting for sync beat to try again");
2732 }
2733 self.inner.sync_beat.listen().await;
2734 }
2735 }
2736
2737 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2738 /// join it.
2739 pub async fn knock(
2740 &self,
2741 room_id_or_alias: OwnedRoomOrAliasId,
2742 reason: Option<String>,
2743 server_names: Vec<OwnedServerName>,
2744 ) -> Result<Room> {
2745 let request =
2746 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2747 let response = self.send(request).await?;
2748 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2749 Ok(Room::new(self.clone(), base_room))
2750 }
2751
2752 /// Checks whether the provided `user_id` belongs to an ignored user.
2753 pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2754 self.base_client().is_user_ignored(user_id).await
2755 }
2756
2757 /// Gets the `max_upload_size` value from the homeserver, getting either a
2758 /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2759 /// missing.
2760 ///
2761 /// Check the spec for more info:
2762 /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2763 pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2764 let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2765 if let Some(data) = max_upload_size_lock.get() {
2766 return Ok(data.to_owned());
2767 }
2768
2769 let response = self
2770 .send(ruma::api::client::authenticated_media::get_media_config::v1::Request::default())
2771 .await?;
2772
2773 match max_upload_size_lock.set(response.upload_size) {
2774 Ok(_) => Ok(response.upload_size),
2775 Err(error) => {
2776 Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2777 }
2778 }
2779 }
2780
2781 /// The settings to use for decrypting events.
2782 #[cfg(feature = "e2e-encryption")]
2783 pub fn decryption_settings(&self) -> &DecryptionSettings {
2784 &self.base_client().decryption_settings
2785 }
2786}
2787
2788#[cfg(any(feature = "testing", test))]
2789impl Client {
2790 /// Test helper to mark users as tracked by the crypto layer.
2791 #[cfg(feature = "e2e-encryption")]
2792 pub async fn update_tracked_users_for_testing(
2793 &self,
2794 user_ids: impl IntoIterator<Item = &UserId>,
2795 ) {
2796 let olm = self.olm_machine().await;
2797 let olm = olm.as_ref().unwrap();
2798 olm.update_tracked_users(user_ids).await.unwrap();
2799 }
2800}
2801
2802/// A weak reference to the inner client, useful when trying to get a handle
2803/// on the owning client.
2804#[derive(Clone, Debug)]
2805pub(crate) struct WeakClient {
2806 client: Weak<ClientInner>,
2807}
2808
2809impl WeakClient {
2810 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2811 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2812 Self { client: Arc::downgrade(client) }
2813 }
2814
2815 /// Construct a [`WeakClient`] from a [`Client`].
2816 pub fn from_client(client: &Client) -> Self {
2817 Self::from_inner(&client.inner)
2818 }
2819
2820 /// Attempts to get a [`Client`] from this [`WeakClient`].
2821 pub fn get(&self) -> Option<Client> {
2822 self.client.upgrade().map(|inner| Client { inner })
2823 }
2824
2825 /// Gets the number of strong (`Arc`) pointers still pointing to this
2826 /// client.
2827 #[allow(dead_code)]
2828 pub fn strong_count(&self) -> usize {
2829 self.client.strong_count()
2830 }
2831}
2832
2833#[derive(Clone)]
2834struct ClientServerInfo {
2835 /// The Matrix versions and unstable features the server supports (known
2836 /// ones only).
2837 supported_versions: CachedValue<SupportedVersions>,
2838
2839 /// The server's well-known file, if any.
2840 well_known: CachedValue<Option<WellKnownResponse>>,
2841}
2842
2843/// A cached value that can either be set or not set, used to avoid confusion
2844/// between a value that is set to `None` (because it doesn't exist) and a value
2845/// that has not been cached yet.
2846#[derive(Clone)]
2847enum CachedValue<Value> {
2848 /// A value has been cached.
2849 Cached(Value),
2850 /// Nothing has been cached yet.
2851 NotSet,
2852}
2853
2854impl<Value> CachedValue<Value> {
2855 /// Unwraps the cached value, returning it if it exists.
2856 ///
2857 /// # Panics
2858 ///
2859 /// If the cached value is not set, this will panic.
2860 fn unwrap_cached_value(self) -> Value {
2861 match self {
2862 CachedValue::Cached(value) => value,
2863 CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2864 }
2865 }
2866
2867 /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
2868 fn as_ref(&self) -> CachedValue<&Value> {
2869 match self {
2870 Self::Cached(value) => CachedValue::Cached(value),
2871 Self::NotSet => CachedValue::NotSet,
2872 }
2873 }
2874
2875 /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
2876 /// function to a contained value (if `Cached`) or returns `NotSet` (if
2877 /// `NotSet`).
2878 fn map<Other, F>(self, f: F) -> CachedValue<Other>
2879 where
2880 F: FnOnce(Value) -> Other,
2881 {
2882 match self {
2883 Self::Cached(value) => CachedValue::Cached(f(value)),
2884 Self::NotSet => CachedValue::NotSet,
2885 }
2886 }
2887}
2888
2889/// Information about the state of a room before we joined it.
2890#[derive(Debug, Clone, Default)]
2891struct PreJoinRoomInfo {
2892 /// The user who invited us to the room, if any.
2893 pub inviter: Option<RoomMember>,
2894}
2895
2896// The http mocking library is not supported for wasm32
2897#[cfg(all(test, not(target_family = "wasm")))]
2898pub(crate) mod tests {
2899 use std::{sync::Arc, time::Duration};
2900
2901 use assert_matches::assert_matches;
2902 use assert_matches2::assert_let;
2903 use eyeball::SharedObservable;
2904 use futures_util::{pin_mut, FutureExt};
2905 use js_int::{uint, UInt};
2906 use matrix_sdk_base::{
2907 store::{MemoryStore, StoreConfig},
2908 RoomState,
2909 };
2910 use matrix_sdk_test::{
2911 async_test, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
2912 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
2913 };
2914 #[cfg(target_family = "wasm")]
2915 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2916
2917 use ruma::{
2918 api::{
2919 client::{
2920 discovery::discover_homeserver::RtcFocusInfo,
2921 room::create_room::v3::Request as CreateRoomRequest,
2922 },
2923 FeatureFlag, MatrixVersion,
2924 },
2925 assign,
2926 events::{
2927 ignored_user_list::IgnoredUserListEventContent,
2928 media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
2929 },
2930 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2931 };
2932 use serde_json::json;
2933 use stream_assert::{assert_next_matches, assert_pending};
2934 use tokio::{
2935 spawn,
2936 time::{sleep, timeout},
2937 };
2938 use url::Url;
2939
2940 use super::Client;
2941 use crate::{
2942 client::{futures::SendMediaUploadRequest, WeakClient},
2943 config::RequestConfig,
2944 futures::SendRequest,
2945 media::MediaError,
2946 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
2947 Error, TransmissionProgress,
2948 };
2949
2950 #[async_test]
2951 async fn test_account_data() {
2952 let server = MatrixMockServer::new().await;
2953 let client = server.client_builder().build().await;
2954
2955 server
2956 .mock_sync()
2957 .ok_and_run(&client, |builder| {
2958 builder.add_global_account_data_event(GlobalAccountDataTestEvent::IgnoredUserList);
2959 })
2960 .await;
2961
2962 let content = client
2963 .account()
2964 .account_data::<IgnoredUserListEventContent>()
2965 .await
2966 .unwrap()
2967 .unwrap()
2968 .deserialize()
2969 .unwrap();
2970
2971 assert_eq!(content.ignored_users.len(), 1);
2972 }
2973
2974 #[async_test]
2975 async fn test_successful_discovery() {
2976 // Imagine this is `matrix.org`.
2977 let server = MatrixMockServer::new().await;
2978 let server_url = server.uri();
2979
2980 // Imagine this is `matrix-client.matrix.org`.
2981 let homeserver = MatrixMockServer::new().await;
2982 let homeserver_url = homeserver.uri();
2983
2984 // Imagine Alice has the user ID `@alice:matrix.org`.
2985 let domain = server_url.strip_prefix("http://").unwrap();
2986 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2987
2988 // The `.well-known` is on the server (e.g. `matrix.org`).
2989 server
2990 .mock_well_known()
2991 .ok_with_homeserver_url(&homeserver_url)
2992 .mock_once()
2993 .named("well-known")
2994 .mount()
2995 .await;
2996
2997 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2998 homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
2999
3000 let client = Client::builder()
3001 .insecure_server_name_no_tls(alice.server_name())
3002 .build()
3003 .await
3004 .unwrap();
3005
3006 assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3007 assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3008 client.server_versions().await.unwrap();
3009 }
3010
3011 #[async_test]
3012 async fn test_discovery_broken_server() {
3013 let server = MatrixMockServer::new().await;
3014 let server_url = server.uri();
3015 let domain = server_url.strip_prefix("http://").unwrap();
3016 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3017
3018 server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3019
3020 assert!(
3021 Client::builder()
3022 .insecure_server_name_no_tls(alice.server_name())
3023 .build()
3024 .await
3025 .is_err(),
3026 "Creating a client from a user ID should fail when the .well-known request fails."
3027 );
3028 }
3029
3030 #[async_test]
3031 async fn test_room_creation() {
3032 let server = MatrixMockServer::new().await;
3033 let client = server.client_builder().build().await;
3034
3035 server
3036 .mock_sync()
3037 .ok_and_run(&client, |builder| {
3038 builder.add_joined_room(
3039 JoinedRoomBuilder::default()
3040 .add_state_event(StateTestEvent::Member)
3041 .add_state_event(StateTestEvent::PowerLevels),
3042 );
3043 })
3044 .await;
3045
3046 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3047 assert_eq!(room.state(), RoomState::Joined);
3048 }
3049
3050 #[async_test]
3051 async fn test_retry_limit_http_requests() {
3052 let server = MatrixMockServer::new().await;
3053 let client = server
3054 .client_builder()
3055 .request_config(RequestConfig::new().retry_limit(3))
3056 .build()
3057 .await;
3058
3059 assert!(client.request_config().retry_limit.unwrap() == 3);
3060
3061 server.mock_login().error500().expect(3).mount().await;
3062
3063 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3064 }
3065
3066 #[async_test]
3067 async fn test_retry_timeout_http_requests() {
3068 // Keep this timeout small so that the test doesn't take long
3069 let retry_timeout = Duration::from_secs(5);
3070 let server = MatrixMockServer::new().await;
3071 let client = server
3072 .client_builder()
3073 .request_config(RequestConfig::new().max_retry_time(retry_timeout))
3074 .build()
3075 .await;
3076
3077 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3078
3079 server.mock_login().error500().expect(2..).mount().await;
3080
3081 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3082 }
3083
3084 #[async_test]
3085 async fn test_short_retry_initial_http_requests() {
3086 let server = MatrixMockServer::new().await;
3087 let client =
3088 server.client_builder().request_config(RequestConfig::short_retry()).build().await;
3089
3090 server.mock_login().error500().expect(3..).mount().await;
3091
3092 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3093 }
3094
3095 #[async_test]
3096 async fn test_no_retry_http_requests() {
3097 let server = MatrixMockServer::new().await;
3098 let client = server.client_builder().build().await;
3099
3100 server.mock_devices().error500().mock_once().mount().await;
3101
3102 client.devices().await.unwrap_err();
3103 }
3104
3105 #[async_test]
3106 async fn test_set_homeserver() {
3107 let client = MockClientBuilder::new(None).build().await;
3108 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3109
3110 let homeserver = Url::parse("http://example.com/").unwrap();
3111 client.set_homeserver(homeserver.clone());
3112 assert_eq!(client.homeserver(), homeserver);
3113 }
3114
3115 #[async_test]
3116 async fn test_search_user_request() {
3117 let server = MatrixMockServer::new().await;
3118 let client = server.client_builder().build().await;
3119
3120 server.mock_user_directory().ok().mock_once().mount().await;
3121
3122 let response = client.search_users("test", 50).await.unwrap();
3123 assert_eq!(response.results.len(), 1);
3124 let result = &response.results[0];
3125 assert_eq!(result.user_id.to_string(), "@test:example.me");
3126 assert_eq!(result.display_name.clone().unwrap(), "Test");
3127 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3128 assert!(!response.limited);
3129 }
3130
3131 #[async_test]
3132 async fn test_request_unstable_features() {
3133 let server = MatrixMockServer::new().await;
3134 let client = server.client_builder().no_server_versions().build().await;
3135
3136 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3137
3138 let unstable_features = client.unstable_features().await.unwrap();
3139 assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3140 assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3141 }
3142
3143 #[async_test]
3144 async fn test_can_homeserver_push_encrypted_event_to_device() {
3145 let server = MatrixMockServer::new().await;
3146 let client = server.client_builder().no_server_versions().build().await;
3147
3148 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3149
3150 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3151 assert!(msc4028_enabled);
3152 }
3153
3154 #[async_test]
3155 async fn test_recently_visited_rooms() {
3156 // Tracking recently visited rooms requires authentication
3157 let client = MockClientBuilder::new(None).unlogged().build().await;
3158 assert_matches!(
3159 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3160 Err(Error::AuthenticationRequired)
3161 );
3162
3163 let client = MockClientBuilder::new(None).build().await;
3164 let account = client.account();
3165
3166 // We should start off with an empty list
3167 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3168
3169 // Tracking a valid room id should add it to the list
3170 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3171 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3172 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3173
3174 // And the existing list shouldn't be changed
3175 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3176 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3177
3178 // Tracking the same room again shouldn't change the list
3179 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3180 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3181 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3182
3183 // Tracking a second room should add it to the front of the list
3184 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3185 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3186 assert_eq!(
3187 account.get_recently_visited_rooms().await.unwrap(),
3188 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3189 );
3190
3191 // Tracking the first room yet again should move it to the front of the list
3192 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3193 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3194 assert_eq!(
3195 account.get_recently_visited_rooms().await.unwrap(),
3196 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3197 );
3198
3199 // Tracking should be capped at 20
3200 for n in 0..20 {
3201 account
3202 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3203 .await
3204 .unwrap();
3205 }
3206
3207 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3208
3209 // And the initial rooms should've been pushed out
3210 let rooms = account.get_recently_visited_rooms().await.unwrap();
3211 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3212 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3213
3214 // And the last tracked room should be the first
3215 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3216 }
3217
3218 #[async_test]
3219 async fn test_client_no_cycle_with_event_cache() {
3220 let client = MockClientBuilder::new(None).build().await;
3221
3222 // Wait for the init tasks to die.
3223 sleep(Duration::from_secs(1)).await;
3224
3225 let weak_client = WeakClient::from_client(&client);
3226 assert_eq!(weak_client.strong_count(), 1);
3227
3228 {
3229 let room_id = room_id!("!room:example.org");
3230
3231 // Have the client know the room.
3232 let response = SyncResponseBuilder::default()
3233 .add_joined_room(JoinedRoomBuilder::new(room_id))
3234 .build_sync_response();
3235 client.inner.base_client.receive_sync_response(response).await.unwrap();
3236
3237 client.event_cache().subscribe().unwrap();
3238
3239 let (_room_event_cache, _drop_handles) =
3240 client.get_room(room_id).unwrap().event_cache().await.unwrap();
3241 }
3242
3243 drop(client);
3244
3245 // Give a bit of time for background tasks to die.
3246 sleep(Duration::from_secs(1)).await;
3247
3248 // The weak client must be the last reference to the client now.
3249 assert_eq!(weak_client.strong_count(), 0);
3250 let client = weak_client.get();
3251 assert!(
3252 client.is_none(),
3253 "too many strong references to the client: {}",
3254 Arc::strong_count(&client.unwrap().inner)
3255 );
3256 }
3257
3258 #[async_test]
3259 async fn test_server_info_caching() {
3260 let server = MatrixMockServer::new().await;
3261 let server_url = server.uri();
3262 let domain = server_url.strip_prefix("http://").unwrap();
3263 let server_name = <&ServerName>::try_from(domain).unwrap();
3264 let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3265
3266 let well_known_mock = server
3267 .mock_well_known()
3268 .ok()
3269 .named("well known mock")
3270 .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3271 .mount_as_scoped()
3272 .await;
3273
3274 let versions_mock = server
3275 .mock_versions()
3276 .ok_with_unstable_features()
3277 .named("first versions mock")
3278 .expect(1)
3279 .mount_as_scoped()
3280 .await;
3281
3282 let memory_store = Arc::new(MemoryStore::new());
3283 let client = Client::builder()
3284 .insecure_server_name_no_tls(server_name)
3285 .store_config(
3286 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3287 .state_store(memory_store.clone()),
3288 )
3289 .build()
3290 .await
3291 .unwrap();
3292
3293 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3294
3295 // These subsequent calls hit the in-memory cache.
3296 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3297 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3298
3299 drop(client);
3300
3301 let client = server
3302 .client_builder()
3303 .no_server_versions()
3304 .store_config(
3305 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3306 .state_store(memory_store.clone()),
3307 )
3308 .build()
3309 .await;
3310
3311 // These calls to the new client hit the on-disk cache.
3312 assert!(client
3313 .unstable_features()
3314 .await
3315 .unwrap()
3316 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3317 let supported = client.supported_versions().await.unwrap();
3318 assert!(supported.versions.contains(&MatrixVersion::V1_0));
3319 assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3320
3321 // Then this call hits the in-memory cache.
3322 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3323
3324 drop(versions_mock);
3325 drop(well_known_mock);
3326
3327 // Now, reset the cache, and observe the endpoints being called again once.
3328 client.reset_server_info().await.unwrap();
3329
3330 server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3331
3332 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3333
3334 // Hits network again.
3335 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3336 // Hits in-memory cache again.
3337 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3338 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3339 }
3340
3341 #[async_test]
3342 async fn test_server_info_without_a_well_known() {
3343 let server = MatrixMockServer::new().await;
3344 let rtc_foci: Vec<RtcFocusInfo> = vec![];
3345
3346 let versions_mock = server
3347 .mock_versions()
3348 .ok_with_unstable_features()
3349 .named("first versions mock")
3350 .expect(1)
3351 .mount_as_scoped()
3352 .await;
3353
3354 let memory_store = Arc::new(MemoryStore::new());
3355 let client = server
3356 .client_builder()
3357 .no_server_versions()
3358 .store_config(
3359 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3360 .state_store(memory_store.clone()),
3361 )
3362 .build()
3363 .await;
3364
3365 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3366
3367 // These subsequent calls hit the in-memory cache.
3368 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3369 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3370
3371 drop(client);
3372
3373 let client = server
3374 .client_builder()
3375 .no_server_versions()
3376 .store_config(
3377 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3378 .state_store(memory_store.clone()),
3379 )
3380 .build()
3381 .await;
3382
3383 // This call to the new client hits the on-disk cache.
3384 assert!(client
3385 .unstable_features()
3386 .await
3387 .unwrap()
3388 .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3389
3390 // Then this call hits the in-memory cache.
3391 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3392
3393 drop(versions_mock);
3394
3395 // Now, reset the cache, and observe the endpoints being called again once.
3396 client.reset_server_info().await.unwrap();
3397
3398 server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3399
3400 // Hits network again.
3401 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3402 // Hits in-memory cache again.
3403 assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3404 assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3405 }
3406
3407 #[async_test]
3408 async fn test_no_network_doesnt_cause_infinite_retries() {
3409 // We want infinite retries for transient errors.
3410 let client =
3411 MockClientBuilder::new(None).request_config(RequestConfig::new()).build().await;
3412
3413 // We don't define a mock server on purpose here, so that the error is really a
3414 // network error.
3415 client.whoami().await.unwrap_err();
3416 }
3417
3418 #[async_test]
3419 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3420 let server = MatrixMockServer::new().await;
3421 let client = server.client_builder().build().await;
3422
3423 let room_id = room_id!("!room:example.org");
3424
3425 server
3426 .mock_sync()
3427 .ok_and_run(&client, |builder| {
3428 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3429 })
3430 .await;
3431
3432 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3433 assert_eq!(room.room_id(), room_id);
3434 }
3435
3436 #[async_test]
3437 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3438 let server = MatrixMockServer::new().await;
3439 let client = server.client_builder().build().await;
3440
3441 let room_id = room_id!("!room:example.org");
3442
3443 let client = Arc::new(client);
3444
3445 // Perform the /sync request with a delay so it starts after the
3446 // `await_room_remote_echo` call has happened
3447 spawn({
3448 let client = client.clone();
3449 async move {
3450 sleep(Duration::from_millis(100)).await;
3451
3452 server
3453 .mock_sync()
3454 .ok_and_run(&client, |builder| {
3455 builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3456 })
3457 .await;
3458 }
3459 });
3460
3461 let room =
3462 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3463 assert_eq!(room.room_id(), room_id);
3464 }
3465
3466 #[async_test]
3467 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3468 let client = MockClientBuilder::new(None).build().await;
3469
3470 let room_id = room_id!("!room:example.org");
3471 // Room is not present so the client won't be able to find it. The call will
3472 // timeout.
3473 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3474 }
3475
3476 #[async_test]
3477 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3478 let server = MatrixMockServer::new().await;
3479 let client = server.client_builder().build().await;
3480
3481 server.mock_create_room().ok().mount().await;
3482
3483 // Create a room in the internal store
3484 let room = client
3485 .create_room(assign!(CreateRoomRequest::new(), {
3486 invite: vec![],
3487 is_direct: false,
3488 }))
3489 .await
3490 .unwrap();
3491
3492 // Room is locally present, but not synced, the call will timeout
3493 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3494 .await
3495 .unwrap_err();
3496 }
3497
3498 #[async_test]
3499 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3500 let server = MatrixMockServer::new().await;
3501 let client = server.client_builder().build().await;
3502
3503 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3504
3505 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3506 assert_matches!(ret, Ok(true));
3507 }
3508
3509 #[async_test]
3510 async fn test_is_room_alias_available_if_alias_is_resolved() {
3511 let server = MatrixMockServer::new().await;
3512 let client = server.client_builder().build().await;
3513
3514 server
3515 .mock_room_directory_resolve_alias()
3516 .ok("!some_room_id:matrix.org", Vec::new())
3517 .expect(1)
3518 .mount()
3519 .await;
3520
3521 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3522 assert_matches!(ret, Ok(false));
3523 }
3524
3525 #[async_test]
3526 async fn test_is_room_alias_available_if_error_found() {
3527 let server = MatrixMockServer::new().await;
3528 let client = server.client_builder().build().await;
3529
3530 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3531
3532 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3533 assert_matches!(ret, Err(_));
3534 }
3535
3536 #[async_test]
3537 async fn test_create_room_alias() {
3538 let server = MatrixMockServer::new().await;
3539 let client = server.client_builder().build().await;
3540
3541 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3542
3543 let ret = client
3544 .create_room_alias(
3545 room_alias_id!("#some_alias:matrix.org"),
3546 room_id!("!some_room:matrix.org"),
3547 )
3548 .await;
3549 assert_matches!(ret, Ok(()));
3550 }
3551
3552 #[async_test]
3553 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3554 let server = MatrixMockServer::new().await;
3555 let client = server.client_builder().build().await;
3556
3557 let room_id = room_id!("!a-room:matrix.org");
3558
3559 // Make sure the summary endpoint is called once
3560 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3561
3562 // We create a locally cached invited room
3563 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3564
3565 // And we get a preview, the server endpoint was reached
3566 let preview = client
3567 .get_room_preview(room_id.into(), Vec::new())
3568 .await
3569 .expect("Room preview should be retrieved");
3570
3571 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3572 }
3573
3574 #[async_test]
3575 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3576 let server = MatrixMockServer::new().await;
3577 let client = server.client_builder().build().await;
3578
3579 let room_id = room_id!("!a-room:matrix.org");
3580
3581 // Make sure the summary endpoint is called once
3582 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3583
3584 // We create a locally cached left room
3585 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3586
3587 // And we get a preview, the server endpoint was reached
3588 let preview = client
3589 .get_room_preview(room_id.into(), Vec::new())
3590 .await
3591 .expect("Room preview should be retrieved");
3592
3593 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3594 }
3595
3596 #[async_test]
3597 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3598 let server = MatrixMockServer::new().await;
3599 let client = server.client_builder().build().await;
3600
3601 let room_id = room_id!("!a-room:matrix.org");
3602
3603 // Make sure the summary endpoint is called once
3604 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3605
3606 // We create a locally cached knocked room
3607 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3608
3609 // And we get a preview, the server endpoint was reached
3610 let preview = client
3611 .get_room_preview(room_id.into(), Vec::new())
3612 .await
3613 .expect("Room preview should be retrieved");
3614
3615 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3616 }
3617
3618 #[async_test]
3619 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3620 let server = MatrixMockServer::new().await;
3621 let client = server.client_builder().build().await;
3622
3623 let room_id = room_id!("!a-room:matrix.org");
3624
3625 // Make sure the summary endpoint is not called
3626 server.mock_room_summary().ok(room_id).never().mount().await;
3627
3628 // We create a locally cached joined room
3629 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3630
3631 // And we get a preview, no server endpoint was reached
3632 let preview = client
3633 .get_room_preview(room_id.into(), Vec::new())
3634 .await
3635 .expect("Room preview should be retrieved");
3636
3637 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3638 }
3639
3640 #[async_test]
3641 async fn test_media_preview_config() {
3642 let server = MatrixMockServer::new().await;
3643 let client = server.client_builder().build().await;
3644
3645 server
3646 .mock_sync()
3647 .ok_and_run(&client, |builder| {
3648 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3649 "content": {
3650 "media_previews": "private",
3651 "invite_avatars": "off"
3652 },
3653 "type": "m.media_preview_config"
3654 })));
3655 })
3656 .await;
3657
3658 let (initial_value, stream) =
3659 client.account().observe_media_preview_config().await.unwrap();
3660
3661 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3662 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3663 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3664 pin_mut!(stream);
3665 assert_pending!(stream);
3666
3667 server
3668 .mock_sync()
3669 .ok_and_run(&client, |builder| {
3670 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3671 "content": {
3672 "media_previews": "off",
3673 "invite_avatars": "on"
3674 },
3675 "type": "m.media_preview_config"
3676 })));
3677 })
3678 .await;
3679
3680 assert_next_matches!(
3681 stream,
3682 MediaPreviewConfigEventContent {
3683 media_previews: MediaPreviews::Off,
3684 invite_avatars: InviteAvatars::On,
3685 ..
3686 }
3687 );
3688 assert_pending!(stream);
3689 }
3690
3691 #[async_test]
3692 async fn test_unstable_media_preview_config() {
3693 let server = MatrixMockServer::new().await;
3694 let client = server.client_builder().build().await;
3695
3696 server
3697 .mock_sync()
3698 .ok_and_run(&client, |builder| {
3699 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3700 "content": {
3701 "media_previews": "private",
3702 "invite_avatars": "off"
3703 },
3704 "type": "io.element.msc4278.media_preview_config"
3705 })));
3706 })
3707 .await;
3708
3709 let (initial_value, stream) =
3710 client.account().observe_media_preview_config().await.unwrap();
3711
3712 let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3713 assert_eq!(initial_value.invite_avatars, InviteAvatars::Off);
3714 assert_eq!(initial_value.media_previews, MediaPreviews::Private);
3715 pin_mut!(stream);
3716 assert_pending!(stream);
3717
3718 server
3719 .mock_sync()
3720 .ok_and_run(&client, |builder| {
3721 builder.add_global_account_data_event(GlobalAccountDataTestEvent::Custom(json!({
3722 "content": {
3723 "media_previews": "off",
3724 "invite_avatars": "on"
3725 },
3726 "type": "io.element.msc4278.media_preview_config"
3727 })));
3728 })
3729 .await;
3730
3731 assert_next_matches!(
3732 stream,
3733 MediaPreviewConfigEventContent {
3734 media_previews: MediaPreviews::Off,
3735 invite_avatars: InviteAvatars::On,
3736 ..
3737 }
3738 );
3739 assert_pending!(stream);
3740 }
3741
3742 #[async_test]
3743 async fn test_media_preview_config_not_found() {
3744 let server = MatrixMockServer::new().await;
3745 let client = server.client_builder().build().await;
3746
3747 let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3748
3749 assert!(initial_value.is_none());
3750 }
3751
3752 #[async_test]
3753 async fn test_load_or_fetch_max_upload_size() {
3754 let server = MatrixMockServer::new().await;
3755 let client = server.client_builder().build().await;
3756
3757 assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3758
3759 server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3760 client.load_or_fetch_max_upload_size().await.unwrap();
3761
3762 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3763 }
3764
3765 #[async_test]
3766 async fn test_uploading_a_too_large_media_file() {
3767 let server = MatrixMockServer::new().await;
3768 let client = server.client_builder().build().await;
3769
3770 server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3771 client.load_or_fetch_max_upload_size().await.unwrap();
3772 assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3773
3774 let data = vec![1, 2];
3775 let upload_request =
3776 ruma::api::client::media::create_content::v3::Request::new(data.clone());
3777 let request = SendRequest {
3778 client: client.clone(),
3779 request: upload_request,
3780 config: None,
3781 send_progress: SharedObservable::new(TransmissionProgress::default()),
3782 };
3783 let media_request = SendMediaUploadRequest::new(request);
3784
3785 let error = media_request.await.err();
3786 assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3787 assert_eq!(max, uint!(1));
3788 assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3789 }
3790}