matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use eyeball::{SharedObservable, Subscriber};
26use eyeball_im::{Vector, VectorDiff};
27use futures_core::Stream;
28use futures_util::StreamExt;
29#[cfg(feature = "e2e-encryption")]
30use matrix_sdk_base::crypto::store::LockableCryptoStore;
31use matrix_sdk_base::{
32 event_cache::store::EventCacheStoreLock,
33 store::{DynStateStore, ServerCapabilities},
34 sync::{Notification, RoomUpdates},
35 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
37};
38#[cfg(feature = "e2e-encryption")]
39use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
40use ruma::{
41 api::{
42 client::{
43 account::whoami,
44 alias::{create_alias, delete_alias, get_alias},
45 device::{delete_devices, get_devices, update_device},
46 directory::{get_public_rooms, get_public_rooms_filtered},
47 discovery::{
48 get_capabilities::{self, Capabilities},
49 get_supported_versions,
50 },
51 error::ErrorKind,
52 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
53 knock::knock_room,
54 membership::{join_room_by_id, join_room_by_id_or_alias},
55 room::create_room,
56 session::login::v3::DiscoveryInfo,
57 sync::sync_events,
58 uiaa,
59 user_directory::search_users,
60 },
61 error::FromHttpResponseError,
62 MatrixVersion, OutgoingRequest,
63 },
64 assign,
65 push::Ruleset,
66 time::Instant,
67 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
68 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
69};
70use serde::de::DeserializeOwned;
71use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
72use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
73use url::Url;
74
75use self::futures::SendRequest;
76#[cfg(feature = "experimental-oidc")]
77use crate::authentication::oidc::Oidc;
78use crate::{
79 authentication::{
80 matrix::MatrixAuth, AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback,
81 },
82 config::RequestConfig,
83 deduplicating_handler::DeduplicatingHandler,
84 error::HttpResult,
85 event_cache::EventCache,
86 event_handler::{
87 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
88 EventHandlerStore, ObservableEventHandler, SyncEvent,
89 },
90 http_client::HttpClient,
91 notification_settings::NotificationSettings,
92 room_preview::RoomPreview,
93 send_queue::SendQueueData,
94 sliding_sync::Version as SlidingSyncVersion,
95 sync::{RoomUpdate, SyncResponse},
96 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
97 Room, TransmissionProgress,
98};
99#[cfg(feature = "e2e-encryption")]
100use crate::{
101 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
102 store_locks::CrossProcessStoreLock,
103};
104
105mod builder;
106pub(crate) mod futures;
107
108pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
109
110#[cfg(not(target_arch = "wasm32"))]
111type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
112#[cfg(target_arch = "wasm32")]
113type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
114
115#[cfg(not(target_arch = "wasm32"))]
116type NotificationHandlerFn =
117 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
118#[cfg(target_arch = "wasm32")]
119type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
120
121/// Enum controlling if a loop running callbacks should continue or abort.
122///
123/// This is mainly used in the [`sync_with_callback`] method, the return value
124/// of the provided callback controls if the sync loop should be exited.
125///
126/// [`sync_with_callback`]: #method.sync_with_callback
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128pub enum LoopCtrl {
129 /// Continue running the loop.
130 Continue,
131 /// Break out of the loop.
132 Break,
133}
134
135/// Represents changes that can occur to a `Client`s `Session`.
136#[derive(Debug, Clone, PartialEq)]
137pub enum SessionChange {
138 /// The session's token is no longer valid.
139 UnknownToken {
140 /// Whether or not the session was soft logged out
141 soft_logout: bool,
142 },
143 /// The session's tokens have been refreshed.
144 TokensRefreshed,
145}
146
147/// An async/await enabled Matrix client.
148///
149/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
150#[derive(Clone)]
151pub struct Client {
152 pub(crate) inner: Arc<ClientInner>,
153}
154
155#[derive(Default)]
156pub(crate) struct ClientLocks {
157 /// Lock ensuring that only a single room may be marked as a DM at once.
158 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
159 /// explanation.
160 pub(crate) mark_as_dm_lock: Mutex<()>,
161
162 /// Lock ensuring that only a single secret store is getting opened at the
163 /// same time.
164 ///
165 /// This is important so we don't accidentally create multiple different new
166 /// default secret storage keys.
167 #[cfg(feature = "e2e-encryption")]
168 pub(crate) open_secret_store_lock: Mutex<()>,
169
170 /// Lock ensuring that we're only storing a single secret at a time.
171 ///
172 /// Take a look at the [`SecretStore::put_secret`] method for a more
173 /// detailed explanation.
174 ///
175 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
176 #[cfg(feature = "e2e-encryption")]
177 pub(crate) store_secret_lock: Mutex<()>,
178
179 /// Lock ensuring that only one method at a time might modify our backup.
180 #[cfg(feature = "e2e-encryption")]
181 pub(crate) backup_modify_lock: Mutex<()>,
182
183 /// Lock ensuring that we're going to attempt to upload backups for a single
184 /// requester.
185 #[cfg(feature = "e2e-encryption")]
186 pub(crate) backup_upload_lock: Mutex<()>,
187
188 /// Handler making sure we only have one group session sharing request in
189 /// flight per room.
190 #[cfg(feature = "e2e-encryption")]
191 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
192
193 /// Lock making sure we're only doing one key claim request at a time.
194 #[cfg(feature = "e2e-encryption")]
195 pub(crate) key_claim_lock: Mutex<()>,
196
197 /// Handler to ensure that only one members request is running at a time,
198 /// given a room.
199 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
200
201 /// Handler to ensure that only one encryption state request is running at a
202 /// time, given a room.
203 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
204
205 /// Deduplicating handler for sending read receipts. The string is an
206 /// internal implementation detail, see [`Self::send_single_receipt`].
207 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
208
209 #[cfg(feature = "e2e-encryption")]
210 pub(crate) cross_process_crypto_store_lock:
211 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
212
213 /// Latest "generation" of data known by the crypto store.
214 ///
215 /// This is a counter that only increments, set in the database (and can
216 /// wrap). It's incremented whenever some process acquires a lock for the
217 /// first time. *This assumes the crypto store lock is being held, to
218 /// avoid data races on writing to this value in the store*.
219 ///
220 /// The current process will maintain this value in local memory and in the
221 /// DB over time. Observing a different value than the one read in
222 /// memory, when reading from the store indicates that somebody else has
223 /// written into the database under our feet.
224 ///
225 /// TODO: this should live in the `OlmMachine`, since it's information
226 /// related to the lock. As of today (2023-07-28), we blow up the entire
227 /// olm machine when there's a generation mismatch. So storing the
228 /// generation in the olm machine would make the client think there's
229 /// *always* a mismatch, and that's why we need to store the generation
230 /// outside the `OlmMachine`.
231 #[cfg(feature = "e2e-encryption")]
232 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
233}
234
235pub(crate) struct ClientInner {
236 /// All the data related to authentication and authorization.
237 pub(crate) auth_ctx: Arc<AuthCtx>,
238
239 /// The URL of the server.
240 ///
241 /// Not to be confused with the `Self::homeserver`. `server` is usually
242 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
243 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
244 /// homeserver (at the time of writing — 2024-08-28).
245 ///
246 /// This value is optional depending on how the `Client` has been built.
247 /// If it's been built from a homeserver URL directly, we don't know the
248 /// server. However, if the `Client` has been built from a server URL or
249 /// name, then the homeserver has been discovered, and we know both.
250 server: Option<Url>,
251
252 /// The URL of the homeserver to connect to.
253 ///
254 /// This is the URL for the client-server Matrix API.
255 homeserver: StdRwLock<Url>,
256
257 /// The sliding sync version.
258 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
259
260 /// The underlying HTTP client.
261 pub(crate) http_client: HttpClient,
262
263 /// User session data.
264 pub(super) base_client: BaseClient,
265
266 /// Server capabilities, either prefilled during building or fetched from
267 /// the server.
268 server_capabilities: RwLock<ClientServerCapabilities>,
269
270 /// Collection of locks individual client methods might want to use, either
271 /// to ensure that only a single call to a method happens at once or to
272 /// deduplicate multiple calls to a method.
273 pub(crate) locks: ClientLocks,
274
275 /// The cross-process store locks holder name.
276 ///
277 /// The SDK provides cross-process store locks (see
278 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
279 /// `holder_name` is the value used for all cross-process store locks
280 /// used by this `Client`.
281 ///
282 /// If multiple `Client`s are running in different processes, this
283 /// value MUST be different for each `Client`.
284 cross_process_store_locks_holder_name: String,
285
286 /// A mapping of the times at which the current user sent typing notices,
287 /// keyed by room.
288 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
289
290 /// Event handlers. See `add_event_handler`.
291 pub(crate) event_handlers: EventHandlerStore,
292
293 /// Notification handlers. See `register_notification_handler`.
294 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
295
296 /// The sender-side of channels used to receive room updates.
297 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
298
299 /// The sender-side of a channel used to observe all the room updates of a
300 /// sync response.
301 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
302
303 /// Whether the client should update its homeserver URL with the discovery
304 /// information present in the login response.
305 respect_login_well_known: bool,
306
307 /// An event that can be listened on to wait for a successful sync. The
308 /// event will only be fired if a sync loop is running. Can be used for
309 /// synchronization, e.g. if we send out a request to create a room, we can
310 /// wait for the sync to get the data to fetch a room object from the state
311 /// store.
312 pub(crate) sync_beat: event_listener::Event,
313
314 /// A central cache for events, inactive first.
315 ///
316 /// It becomes active when [`EventCache::subscribe`] is called.
317 pub(crate) event_cache: OnceCell<EventCache>,
318
319 /// End-to-end encryption related state.
320 #[cfg(feature = "e2e-encryption")]
321 pub(crate) e2ee: EncryptionData,
322
323 /// The verification state of our own device.
324 #[cfg(feature = "e2e-encryption")]
325 pub(crate) verification_state: SharedObservable<VerificationState>,
326
327 /// Data related to the [`SendQueue`].
328 ///
329 /// [`SendQueue`]: crate::send_queue::SendQueue
330 pub(crate) send_queue_data: Arc<SendQueueData>,
331}
332
333impl ClientInner {
334 /// Create a new `ClientInner`.
335 ///
336 /// All the fields passed as parameters here are those that must be cloned
337 /// upon instantiation of a sub-client, e.g. a client specialized for
338 /// notifications.
339 #[allow(clippy::too_many_arguments)]
340 async fn new(
341 auth_ctx: Arc<AuthCtx>,
342 server: Option<Url>,
343 homeserver: Url,
344 sliding_sync_version: SlidingSyncVersion,
345 http_client: HttpClient,
346 base_client: BaseClient,
347 server_capabilities: ClientServerCapabilities,
348 respect_login_well_known: bool,
349 event_cache: OnceCell<EventCache>,
350 send_queue: Arc<SendQueueData>,
351 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
352 cross_process_store_locks_holder_name: String,
353 ) -> Arc<Self> {
354 let client = Self {
355 server,
356 homeserver: StdRwLock::new(homeserver),
357 auth_ctx,
358 sliding_sync_version: StdRwLock::new(sliding_sync_version),
359 http_client,
360 base_client,
361 locks: Default::default(),
362 cross_process_store_locks_holder_name,
363 server_capabilities: RwLock::new(server_capabilities),
364 typing_notice_times: Default::default(),
365 event_handlers: Default::default(),
366 notification_handlers: Default::default(),
367 room_update_channels: Default::default(),
368 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
369 // ballast for all observers to catch up.
370 room_updates_sender: broadcast::Sender::new(32),
371 respect_login_well_known,
372 sync_beat: event_listener::Event::new(),
373 event_cache,
374 send_queue_data: send_queue,
375 #[cfg(feature = "e2e-encryption")]
376 e2ee: EncryptionData::new(encryption_settings),
377 #[cfg(feature = "e2e-encryption")]
378 verification_state: SharedObservable::new(VerificationState::Unknown),
379 };
380
381 #[allow(clippy::let_and_return)]
382 let client = Arc::new(client);
383
384 #[cfg(feature = "e2e-encryption")]
385 client.e2ee.initialize_room_key_tasks(&client);
386
387 let _ = client
388 .event_cache
389 .get_or_init(|| async { EventCache::new(WeakClient::from_inner(&client)) })
390 .await;
391
392 client
393 }
394}
395
396#[cfg(not(tarpaulin_include))]
397impl Debug for Client {
398 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
399 write!(fmt, "Client")
400 }
401}
402
403impl Client {
404 /// Create a new [`Client`] that will use the given homeserver.
405 ///
406 /// # Arguments
407 ///
408 /// * `homeserver_url` - The homeserver that the client should connect to.
409 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
410 Self::builder().homeserver_url(homeserver_url).build().await
411 }
412
413 /// Returns a subscriber that publishes an event every time the ignore user
414 /// list changes.
415 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
416 self.inner.base_client.subscribe_to_ignore_user_list_changes()
417 }
418
419 /// Create a new [`ClientBuilder`].
420 pub fn builder() -> ClientBuilder {
421 ClientBuilder::new()
422 }
423
424 pub(crate) fn base_client(&self) -> &BaseClient {
425 &self.inner.base_client
426 }
427
428 /// The underlying HTTP client.
429 pub fn http_client(&self) -> &reqwest::Client {
430 &self.inner.http_client.inner
431 }
432
433 pub(crate) fn locks(&self) -> &ClientLocks {
434 &self.inner.locks
435 }
436
437 /// The cross-process store locks holder name.
438 ///
439 /// The SDK provides cross-process store locks (see
440 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
441 /// `holder_name` is the value used for all cross-process store locks
442 /// used by this `Client`.
443 pub fn cross_process_store_locks_holder_name(&self) -> &str {
444 &self.inner.cross_process_store_locks_holder_name
445 }
446
447 /// Change the homeserver URL used by this client.
448 ///
449 /// # Arguments
450 ///
451 /// * `homeserver_url` - The new URL to use.
452 fn set_homeserver(&self, homeserver_url: Url) {
453 *self.inner.homeserver.write().unwrap() = homeserver_url;
454 }
455
456 /// Get the capabilities of the homeserver.
457 ///
458 /// This method should be used to check what features are supported by the
459 /// homeserver.
460 ///
461 /// # Examples
462 ///
463 /// ```no_run
464 /// # use matrix_sdk::Client;
465 /// # use url::Url;
466 /// # async {
467 /// # let homeserver = Url::parse("http://example.com")?;
468 /// let client = Client::new(homeserver).await?;
469 ///
470 /// let capabilities = client.get_capabilities().await?;
471 ///
472 /// if capabilities.change_password.enabled {
473 /// // Change password
474 /// }
475 /// # anyhow::Ok(()) };
476 /// ```
477 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
478 let res = self.send(get_capabilities::v3::Request::new()).await?;
479 Ok(res.capabilities)
480 }
481
482 /// Get a copy of the default request config.
483 ///
484 /// The default request config is what's used when sending requests if no
485 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
486 /// function with such a parameter.
487 ///
488 /// If the default request config was not customized through
489 /// [`ClientBuilder`] when creating this `Client`, the returned value will
490 /// be equivalent to [`RequestConfig::default()`].
491 pub fn request_config(&self) -> RequestConfig {
492 self.inner.http_client.request_config
493 }
494
495 /// Is the client logged in.
496 pub fn logged_in(&self) -> bool {
497 self.inner.base_client.logged_in()
498 }
499
500 /// The server used by the client.
501 ///
502 /// See `Self::server` to learn more.
503 pub fn server(&self) -> Option<&Url> {
504 self.inner.server.as_ref()
505 }
506
507 /// The homeserver of the client.
508 pub fn homeserver(&self) -> Url {
509 self.inner.homeserver.read().unwrap().clone()
510 }
511
512 /// Get the sliding sync version.
513 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
514 self.inner.sliding_sync_version.read().unwrap().clone()
515 }
516
517 /// Override the sliding sync version.
518 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
519 let mut lock = self.inner.sliding_sync_version.write().unwrap();
520 *lock = version;
521 }
522
523 /// Get the Matrix user session meta information.
524 ///
525 /// If the client is currently logged in, this will return a
526 /// [`SessionMeta`] object which contains the user ID and device ID.
527 /// Otherwise it returns `None`.
528 pub fn session_meta(&self) -> Option<&SessionMeta> {
529 self.base_client().session_meta()
530 }
531
532 /// Returns a receiver that gets events for each room info update. To watch
533 /// for new events, use `receiver.resubscribe()`. Each event contains the
534 /// room and a boolean whether this event should trigger a room list update.
535 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
536 self.base_client().room_info_notable_update_receiver()
537 }
538
539 /// Performs a search for users.
540 /// The search is performed case-insensitively on user IDs and display names
541 ///
542 /// # Arguments
543 ///
544 /// * `search_term` - The search term for the search
545 /// * `limit` - The maximum number of results to return. Defaults to 10.
546 ///
547 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
548 pub async fn search_users(
549 &self,
550 search_term: &str,
551 limit: u64,
552 ) -> HttpResult<search_users::v3::Response> {
553 let mut request = search_users::v3::Request::new(search_term.to_owned());
554
555 if let Some(limit) = UInt::new(limit) {
556 request.limit = limit;
557 }
558
559 self.send(request).await
560 }
561
562 /// Get the user id of the current owner of the client.
563 pub fn user_id(&self) -> Option<&UserId> {
564 self.session_meta().map(|s| s.user_id.as_ref())
565 }
566
567 /// Get the device ID that identifies the current session.
568 pub fn device_id(&self) -> Option<&DeviceId> {
569 self.session_meta().map(|s| s.device_id.as_ref())
570 }
571
572 /// Get the current access token for this session, regardless of the
573 /// authentication API used to log in.
574 ///
575 /// Will be `None` if the client has not been logged in.
576 pub fn access_token(&self) -> Option<String> {
577 self.inner.auth_ctx.auth_data.get()?.access_token()
578 }
579
580 /// Access the authentication API used to log in this client.
581 ///
582 /// Will be `None` if the client has not been logged in.
583 pub fn auth_api(&self) -> Option<AuthApi> {
584 match self.inner.auth_ctx.auth_data.get()? {
585 AuthData::Matrix(_) => Some(AuthApi::Matrix(self.matrix_auth())),
586 #[cfg(feature = "experimental-oidc")]
587 AuthData::Oidc(_) => Some(AuthApi::Oidc(self.oidc())),
588 }
589 }
590
591 /// Get the whole session info of this client.
592 ///
593 /// Will be `None` if the client has not been logged in.
594 ///
595 /// Can be used with [`Client::restore_session`] to restore a previously
596 /// logged-in session.
597 pub fn session(&self) -> Option<AuthSession> {
598 match self.auth_api()? {
599 AuthApi::Matrix(api) => api.session().map(Into::into),
600 #[cfg(feature = "experimental-oidc")]
601 AuthApi::Oidc(api) => api.full_session().map(Into::into),
602 }
603 }
604
605 /// Get a reference to the state store.
606 pub fn store(&self) -> &DynStateStore {
607 self.base_client().store()
608 }
609
610 /// Get a reference to the event cache store.
611 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
612 self.base_client().event_cache_store()
613 }
614
615 /// Access the native Matrix authentication API with this client.
616 pub fn matrix_auth(&self) -> MatrixAuth {
617 MatrixAuth::new(self.clone())
618 }
619
620 /// Get the account of the current owner of the client.
621 pub fn account(&self) -> Account {
622 Account::new(self.clone())
623 }
624
625 /// Get the encryption manager of the client.
626 #[cfg(feature = "e2e-encryption")]
627 pub fn encryption(&self) -> Encryption {
628 Encryption::new(self.clone())
629 }
630
631 /// Get the media manager of the client.
632 pub fn media(&self) -> Media {
633 Media::new(self.clone())
634 }
635
636 /// Get the pusher manager of the client.
637 pub fn pusher(&self) -> Pusher {
638 Pusher::new(self.clone())
639 }
640
641 /// Access the OpenID Connect API of the client.
642 #[cfg(feature = "experimental-oidc")]
643 pub fn oidc(&self) -> Oidc {
644 Oidc::new(self.clone())
645 }
646
647 /// Register a handler for a specific event type.
648 ///
649 /// The handler is a function or closure with one or more arguments. The
650 /// first argument is the event itself. All additional arguments are
651 /// "context" arguments: They have to implement [`EventHandlerContext`].
652 /// This trait is named that way because most of the types implementing it
653 /// give additional context about an event: The room it was in, its raw form
654 /// and other similar things. As two exceptions to this,
655 /// [`Client`] and [`EventHandlerHandle`] also implement the
656 /// `EventHandlerContext` trait so you don't have to clone your client
657 /// into the event handler manually and a handler can decide to remove
658 /// itself.
659 ///
660 /// Some context arguments are not universally applicable. A context
661 /// argument that isn't available for the given event type will result in
662 /// the event handler being skipped and an error being logged. The following
663 /// context argument types are only available for a subset of event types:
664 ///
665 /// * [`Room`] is only available for room-specific events, i.e. not for
666 /// events like global account data events or presence events.
667 ///
668 /// You can provide custom context via
669 /// [`add_event_handler_context`](Client::add_event_handler_context) and
670 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
671 /// into the event handler.
672 ///
673 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
674 ///
675 /// # Examples
676 ///
677 /// ```no_run
678 /// use matrix_sdk::{
679 /// deserialized_responses::EncryptionInfo,
680 /// event_handler::Ctx,
681 /// ruma::{
682 /// events::{
683 /// macros::EventContent,
684 /// push_rules::PushRulesEvent,
685 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
686 /// },
687 /// push::Action,
688 /// Int, MilliSecondsSinceUnixEpoch,
689 /// },
690 /// Client, Room,
691 /// };
692 /// use serde::{Deserialize, Serialize};
693 ///
694 /// # async fn example(client: Client) {
695 /// client.add_event_handler(
696 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
697 /// // Common usage: Room event plus room and client.
698 /// },
699 /// );
700 /// client.add_event_handler(
701 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
702 /// async move {
703 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
704 /// // unencrypted events and events that were decrypted by the SDK.
705 /// }
706 /// },
707 /// );
708 /// client.add_event_handler(
709 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
710 /// async move {
711 /// // A `Vec<Action>` parameter allows you to know which push actions
712 /// // are applicable for an event. For example, an event with
713 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
714 /// // in the timeline.
715 /// }
716 /// },
717 /// );
718 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
719 /// // You can omit any or all arguments after the first.
720 /// });
721 ///
722 /// // Registering a temporary event handler:
723 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
724 /// /* Event handler */
725 /// });
726 /// client.remove_event_handler(handle);
727 ///
728 /// // Registering custom event handler context:
729 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
730 /// struct MyContext {
731 /// number: usize,
732 /// }
733 /// client.add_event_handler_context(MyContext { number: 5 });
734 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
735 /// // Use the context
736 /// });
737 ///
738 /// // Custom events work exactly the same way, you just need to declare
739 /// // the content struct and use the EventContent derive macro on it.
740 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
741 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
742 /// struct TokenEventContent {
743 /// token: String,
744 /// #[serde(rename = "exp")]
745 /// expires_at: MilliSecondsSinceUnixEpoch,
746 /// }
747 ///
748 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
749 /// todo!("Display the token");
750 /// });
751 ///
752 /// // Event handler closures can also capture local variables.
753 /// // Make sure they are cheap to clone though, because they will be cloned
754 /// // every time the closure is called.
755 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
756 ///
757 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
758 /// println!("Calling the handler with identifier {data}");
759 /// });
760 /// # }
761 /// ```
762 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
763 where
764 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
765 H: EventHandler<Ev, Ctx>,
766 {
767 self.add_event_handler_impl(handler, None)
768 }
769
770 /// Register a handler for a specific room, and event type.
771 ///
772 /// This method works the same way as
773 /// [`add_event_handler`][Self::add_event_handler], except that the handler
774 /// will only be called for events in the room with the specified ID. See
775 /// that method for more details on event handler functions.
776 ///
777 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
778 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
779 /// your use case.
780 pub fn add_room_event_handler<Ev, Ctx, H>(
781 &self,
782 room_id: &RoomId,
783 handler: H,
784 ) -> EventHandlerHandle
785 where
786 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
787 H: EventHandler<Ev, Ctx>,
788 {
789 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
790 }
791
792 /// Observe a specific event type.
793 ///
794 /// `Ev` represents the kind of event that will be observed. `Ctx`
795 /// represents the context that will come with the event. It relies on the
796 /// same mechanism as [`Client::add_event_handler`]. The main difference is
797 /// that it returns an [`ObservableEventHandler`] and doesn't require a
798 /// user-defined closure. It is possible to subscribe to the
799 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
800 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
801 /// Ctx)`.
802 ///
803 /// Be careful that only the most recent value can be observed. Subscribers
804 /// are notified when a new value is sent, but there is no guarantee
805 /// that they will see all values.
806 ///
807 /// # Example
808 ///
809 /// Let's see a classical usage:
810 ///
811 /// ```
812 /// use futures_util::StreamExt as _;
813 /// use matrix_sdk::{
814 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
815 /// Client, Room,
816 /// };
817 ///
818 /// # async fn example(client: Client) -> Option<()> {
819 /// let observer =
820 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
821 ///
822 /// let mut subscriber = observer.subscribe();
823 ///
824 /// let (event, (room, push_actions)) = subscriber.next().await?;
825 /// # Some(())
826 /// # }
827 /// ```
828 ///
829 /// Now let's see how to get several contexts that can be useful for you:
830 ///
831 /// ```
832 /// use matrix_sdk::{
833 /// deserialized_responses::EncryptionInfo,
834 /// ruma::{
835 /// events::room::{
836 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
837 /// },
838 /// push::Action,
839 /// },
840 /// Client, Room,
841 /// };
842 ///
843 /// # async fn example(client: Client) {
844 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
845 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
846 ///
847 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
848 /// // to distinguish between unencrypted events and events that were decrypted
849 /// // by the SDK.
850 /// let _ = client
851 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
852 /// );
853 ///
854 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
855 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
856 /// // should be highlighted in the timeline.
857 /// let _ =
858 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
859 ///
860 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
861 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
862 /// # }
863 /// ```
864 ///
865 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
866 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
867 where
868 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
869 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
870 {
871 self.observe_room_events_impl(None)
872 }
873
874 /// Observe a specific room, and event type.
875 ///
876 /// This method works the same way as [`Client::observe_events`], except
877 /// that the observability will only be applied for events in the room with
878 /// the specified ID. See that method for more details.
879 ///
880 /// Be careful that only the most recent value can be observed. Subscribers
881 /// are notified when a new value is sent, but there is no guarantee
882 /// that they will see all values.
883 pub fn observe_room_events<Ev, Ctx>(
884 &self,
885 room_id: &RoomId,
886 ) -> ObservableEventHandler<(Ev, Ctx)>
887 where
888 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
889 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
890 {
891 self.observe_room_events_impl(Some(room_id.to_owned()))
892 }
893
894 /// Shared implementation for `Client::observe_events` and
895 /// `Client::observe_room_events`.
896 fn observe_room_events_impl<Ev, Ctx>(
897 &self,
898 room_id: Option<OwnedRoomId>,
899 ) -> ObservableEventHandler<(Ev, Ctx)>
900 where
901 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
902 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
903 {
904 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
905 // new value.
906 let shared_observable = SharedObservable::new(None);
907
908 ObservableEventHandler::new(
909 shared_observable.clone(),
910 self.event_handler_drop_guard(self.add_event_handler_impl(
911 move |event: Ev, context: Ctx| {
912 shared_observable.set(Some((event, context)));
913
914 ready(())
915 },
916 room_id,
917 )),
918 )
919 }
920
921 /// Remove the event handler associated with the handle.
922 ///
923 /// Note that you **must not** call `remove_event_handler` from the
924 /// non-async part of an event handler, that is:
925 ///
926 /// ```ignore
927 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
928 /// // ⚠ this will cause a deadlock ⚠
929 /// client.remove_event_handler(handle);
930 ///
931 /// async move {
932 /// // removing the event handler here is fine
933 /// client.remove_event_handler(handle);
934 /// }
935 /// })
936 /// ```
937 ///
938 /// Note also that handlers that remove themselves will still execute with
939 /// events received in the same sync cycle.
940 ///
941 /// # Arguments
942 ///
943 /// `handle` - The [`EventHandlerHandle`] that is returned when
944 /// registering the event handler with [`Client::add_event_handler`].
945 ///
946 /// # Examples
947 ///
948 /// ```no_run
949 /// # use url::Url;
950 /// # use tokio::sync::mpsc;
951 /// #
952 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
953 /// #
954 /// use matrix_sdk::{
955 /// event_handler::EventHandlerHandle,
956 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
957 /// };
958 /// #
959 /// # futures_executor::block_on(async {
960 /// # let client = matrix_sdk::Client::builder()
961 /// # .homeserver_url(homeserver)
962 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
963 /// # .build()
964 /// # .await
965 /// # .unwrap();
966 ///
967 /// client.add_event_handler(
968 /// |ev: SyncRoomMemberEvent,
969 /// client: Client,
970 /// handle: EventHandlerHandle| async move {
971 /// // Common usage: Check arriving Event is the expected one
972 /// println!("Expected RoomMemberEvent received!");
973 /// client.remove_event_handler(handle);
974 /// },
975 /// );
976 /// # });
977 /// ```
978 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
979 self.inner.event_handlers.remove(handle);
980 }
981
982 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
983 /// the given handle.
984 ///
985 /// When the returned value is dropped, the event handler will be removed.
986 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
987 EventHandlerDropGuard::new(handle, self.clone())
988 }
989
990 /// Add an arbitrary value for use as event handler context.
991 ///
992 /// The value can be obtained in an event handler by adding an argument of
993 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
994 ///
995 /// If a value of the same type has been added before, it will be
996 /// overwritten.
997 ///
998 /// # Examples
999 ///
1000 /// ```no_run
1001 /// use matrix_sdk::{
1002 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1003 /// Room,
1004 /// };
1005 /// # #[derive(Clone)]
1006 /// # struct SomeType;
1007 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1008 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1009 /// # futures_executor::block_on(async {
1010 /// # let client = matrix_sdk::Client::builder()
1011 /// # .homeserver_url(homeserver)
1012 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1013 /// # .build()
1014 /// # .await
1015 /// # .unwrap();
1016 ///
1017 /// // Handle used to send messages to the UI part of the app
1018 /// let my_gui_handle: SomeType = obtain_gui_handle();
1019 ///
1020 /// client.add_event_handler_context(my_gui_handle.clone());
1021 /// client.add_event_handler(
1022 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1023 /// async move {
1024 /// // gui_handle.send(DisplayMessage { message: ev });
1025 /// }
1026 /// },
1027 /// );
1028 /// # });
1029 /// ```
1030 pub fn add_event_handler_context<T>(&self, ctx: T)
1031 where
1032 T: Clone + Send + Sync + 'static,
1033 {
1034 self.inner.event_handlers.add_context(ctx);
1035 }
1036
1037 /// Register a handler for a notification.
1038 ///
1039 /// Similar to [`Client::add_event_handler`], but only allows functions
1040 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1041 /// [`Client`] for now.
1042 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1043 where
1044 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1045 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1046 {
1047 self.inner.notification_handlers.write().await.push(Box::new(
1048 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1049 ));
1050
1051 self
1052 }
1053
1054 /// Subscribe to all updates for the room with the given ID.
1055 ///
1056 /// The returned receiver will receive a new message for each sync response
1057 /// that contains updates for that room.
1058 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1059 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1060 btree_map::Entry::Vacant(entry) => {
1061 let (tx, rx) = broadcast::channel(8);
1062 entry.insert(tx);
1063 rx
1064 }
1065 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1066 }
1067 }
1068
1069 /// Subscribe to all updates to all rooms, whenever any has been received in
1070 /// a sync response.
1071 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1072 self.inner.room_updates_sender.subscribe()
1073 }
1074
1075 pub(crate) async fn notification_handlers(
1076 &self,
1077 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1078 self.inner.notification_handlers.read().await
1079 }
1080
1081 /// Get all the rooms the client knows about.
1082 ///
1083 /// This will return the list of joined, invited, and left rooms.
1084 pub fn rooms(&self) -> Vec<Room> {
1085 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1086 }
1087
1088 /// Get all the rooms the client knows about, filtered by room state.
1089 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1090 self.base_client()
1091 .rooms_filtered(filter)
1092 .into_iter()
1093 .map(|room| Room::new(self.clone(), room))
1094 .collect()
1095 }
1096
1097 /// Get a stream of all the rooms, in addition to the existing rooms.
1098 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1099 let (rooms, stream) = self.base_client().rooms_stream();
1100
1101 let map_room = |room| Room::new(self.clone(), room);
1102
1103 (
1104 rooms.into_iter().map(map_room).collect(),
1105 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1106 )
1107 }
1108
1109 /// Returns the joined rooms this client knows about.
1110 pub fn joined_rooms(&self) -> Vec<Room> {
1111 self.base_client()
1112 .rooms_filtered(RoomStateFilter::JOINED)
1113 .into_iter()
1114 .map(|room| Room::new(self.clone(), room))
1115 .collect()
1116 }
1117
1118 /// Returns the invited rooms this client knows about.
1119 pub fn invited_rooms(&self) -> Vec<Room> {
1120 self.base_client()
1121 .rooms_filtered(RoomStateFilter::INVITED)
1122 .into_iter()
1123 .map(|room| Room::new(self.clone(), room))
1124 .collect()
1125 }
1126
1127 /// Returns the left rooms this client knows about.
1128 pub fn left_rooms(&self) -> Vec<Room> {
1129 self.base_client()
1130 .rooms_filtered(RoomStateFilter::LEFT)
1131 .into_iter()
1132 .map(|room| Room::new(self.clone(), room))
1133 .collect()
1134 }
1135
1136 /// Get a room with the given room id.
1137 ///
1138 /// # Arguments
1139 ///
1140 /// `room_id` - The unique id of the room that should be fetched.
1141 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1142 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1143 }
1144
1145 /// Gets the preview of a room, whether the current user has joined it or
1146 /// not.
1147 pub async fn get_room_preview(
1148 &self,
1149 room_or_alias_id: &RoomOrAliasId,
1150 via: Vec<OwnedServerName>,
1151 ) -> Result<RoomPreview> {
1152 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1153 Ok(room_id) => room_id.to_owned(),
1154 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1155 };
1156
1157 if let Some(room) = self.get_room(&room_id) {
1158 // The cached data can only be trusted if the room state is joined or
1159 // banned: for invite and knock rooms, no updates will be received
1160 // for the rooms after the invite/knock action took place so we may
1161 // have very out to date data for important fields such as
1162 // `join_rule`. For left rooms, the homeserver should return the latest info.
1163 match room.state() {
1164 RoomState::Joined | RoomState::Banned => {
1165 return Ok(RoomPreview::from_known_room(&room).await);
1166 }
1167 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1168 }
1169 }
1170
1171 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1172 }
1173
1174 /// Resolve a room alias to a room id and a list of servers which know
1175 /// about it.
1176 ///
1177 /// # Arguments
1178 ///
1179 /// `room_alias` - The room alias to be resolved.
1180 pub async fn resolve_room_alias(
1181 &self,
1182 room_alias: &RoomAliasId,
1183 ) -> HttpResult<get_alias::v3::Response> {
1184 let request = get_alias::v3::Request::new(room_alias.to_owned());
1185 self.send(request).await
1186 }
1187
1188 /// Checks if a room alias is not in use yet.
1189 ///
1190 /// Returns:
1191 /// - `Ok(true)` if the room alias is available.
1192 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1193 /// status code).
1194 /// - An `Err` otherwise.
1195 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1196 match self.resolve_room_alias(alias).await {
1197 // The room alias was resolved, so it's already in use.
1198 Ok(_) => Ok(false),
1199 Err(error) => {
1200 match error.client_api_error_kind() {
1201 // The room alias wasn't found, so it's available.
1202 Some(ErrorKind::NotFound) => Ok(true),
1203 _ => Err(error),
1204 }
1205 }
1206 }
1207 }
1208
1209 /// Adds a new room alias associated with a room to the room directory.
1210 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1211 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1212 self.send(request).await?;
1213 Ok(())
1214 }
1215
1216 /// Removes a room alias from the room directory.
1217 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1218 let request = delete_alias::v3::Request::new(alias.to_owned());
1219 self.send(request).await?;
1220 Ok(())
1221 }
1222
1223 /// Update the homeserver from the login response well-known if needed.
1224 ///
1225 /// # Arguments
1226 ///
1227 /// * `login_well_known` - The `well_known` field from a successful login
1228 /// response.
1229 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1230 if self.inner.respect_login_well_known {
1231 if let Some(well_known) = login_well_known {
1232 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1233 self.set_homeserver(homeserver);
1234 }
1235 }
1236 }
1237 }
1238
1239 /// Restore a session previously logged-in using one of the available
1240 /// authentication APIs.
1241 ///
1242 /// See the documentation of the corresponding authentication API's
1243 /// `restore_session` method for more information.
1244 ///
1245 /// # Panics
1246 ///
1247 /// Panics if a session was already restored or logged in.
1248 #[instrument(skip_all)]
1249 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1250 let session = session.into();
1251 match session {
1252 AuthSession::Matrix(s) => Box::pin(self.matrix_auth().restore_session(s)).await,
1253 #[cfg(feature = "experimental-oidc")]
1254 AuthSession::Oidc(s) => Box::pin(self.oidc().restore_session(*s)).await,
1255 }
1256 }
1257
1258 pub(crate) async fn set_session_meta(
1259 &self,
1260 session_meta: SessionMeta,
1261 #[cfg(feature = "e2e-encryption")] custom_account: Option<vodozemac::olm::Account>,
1262 ) -> Result<()> {
1263 self.base_client()
1264 .set_session_meta(
1265 session_meta,
1266 #[cfg(feature = "e2e-encryption")]
1267 custom_account,
1268 )
1269 .await?;
1270
1271 Ok(())
1272 }
1273
1274 /// Refresh the access token using the authentication API used to log into
1275 /// this session.
1276 ///
1277 /// See the documentation of the authentication API's `refresh_access_token`
1278 /// method for more information.
1279 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1280 let Some(auth_api) = self.auth_api() else {
1281 return Err(RefreshTokenError::RefreshTokenRequired);
1282 };
1283
1284 match auth_api {
1285 AuthApi::Matrix(api) => {
1286 trace!("Token refresh: Using the homeserver.");
1287 Box::pin(api.refresh_access_token()).await?;
1288 }
1289 #[cfg(feature = "experimental-oidc")]
1290 AuthApi::Oidc(api) => {
1291 trace!("Token refresh: Using OIDC.");
1292 Box::pin(api.refresh_access_token()).await?;
1293 }
1294 }
1295
1296 Ok(())
1297 }
1298
1299 /// Get or upload a sync filter.
1300 ///
1301 /// This method will either get a filter ID from the store or upload the
1302 /// filter definition to the homeserver and return the new filter ID.
1303 ///
1304 /// # Arguments
1305 ///
1306 /// * `filter_name` - The unique name of the filter, this name will be used
1307 /// locally to store and identify the filter ID returned by the server.
1308 ///
1309 /// * `definition` - The filter definition that should be uploaded to the
1310 /// server if no filter ID can be found in the store.
1311 ///
1312 /// # Examples
1313 ///
1314 /// ```no_run
1315 /// # use matrix_sdk::{
1316 /// # Client, config::SyncSettings,
1317 /// # ruma::api::client::{
1318 /// # filter::{
1319 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1320 /// # },
1321 /// # sync::sync_events::v3::Filter,
1322 /// # }
1323 /// # };
1324 /// # use url::Url;
1325 /// # async {
1326 /// # let homeserver = Url::parse("http://example.com").unwrap();
1327 /// # let client = Client::new(homeserver).await.unwrap();
1328 /// let mut filter = FilterDefinition::default();
1329 ///
1330 /// // Let's enable member lazy loading.
1331 /// filter.room.state.lazy_load_options =
1332 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1333 ///
1334 /// let filter_id = client
1335 /// .get_or_upload_filter("sync", filter)
1336 /// .await
1337 /// .unwrap();
1338 ///
1339 /// let sync_settings = SyncSettings::new()
1340 /// .filter(Filter::FilterId(filter_id));
1341 ///
1342 /// let response = client.sync_once(sync_settings).await.unwrap();
1343 /// # };
1344 #[instrument(skip(self, definition))]
1345 pub async fn get_or_upload_filter(
1346 &self,
1347 filter_name: &str,
1348 definition: FilterDefinition,
1349 ) -> Result<String> {
1350 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1351 debug!("Found filter locally");
1352 Ok(filter)
1353 } else {
1354 debug!("Didn't find filter locally");
1355 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1356 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1357 let response = self.send(request).await?;
1358
1359 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1360
1361 Ok(response.filter_id)
1362 }
1363 }
1364
1365 /// Join a room by `RoomId`.
1366 ///
1367 /// Returns a `join_room_by_id::Response` consisting of the
1368 /// joined rooms `RoomId`.
1369 ///
1370 /// # Arguments
1371 ///
1372 /// * `room_id` - The `RoomId` of the room to be joined.
1373 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1374 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1375 let response = self.send(request).await?;
1376 let base_room = self.base_client().room_joined(&response.room_id).await?;
1377 Ok(Room::new(self.clone(), base_room))
1378 }
1379
1380 /// Join a room by `RoomId`.
1381 ///
1382 /// Returns a `join_room_by_id_or_alias::Response` consisting of the
1383 /// joined rooms `RoomId`.
1384 ///
1385 /// # Arguments
1386 ///
1387 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1388 /// alias looks like `#name:example.com`.
1389 pub async fn join_room_by_id_or_alias(
1390 &self,
1391 alias: &RoomOrAliasId,
1392 server_names: &[OwnedServerName],
1393 ) -> Result<Room> {
1394 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1395 via: server_names.to_owned(),
1396 });
1397 let response = self.send(request).await?;
1398 let base_room = self.base_client().room_joined(&response.room_id).await?;
1399 Ok(Room::new(self.clone(), base_room))
1400 }
1401
1402 /// Search the homeserver's directory of public rooms.
1403 ///
1404 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1405 /// a `get_public_rooms::Response`.
1406 ///
1407 /// # Arguments
1408 ///
1409 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1410 ///
1411 /// * `since` - Pagination token from a previous request.
1412 ///
1413 /// * `server` - The name of the server, if `None` the requested server is
1414 /// used.
1415 ///
1416 /// # Examples
1417 /// ```no_run
1418 /// use matrix_sdk::Client;
1419 /// # use url::Url;
1420 /// # let homeserver = Url::parse("http://example.com").unwrap();
1421 /// # let limit = Some(10);
1422 /// # let since = Some("since token");
1423 /// # let server = Some("servername.com".try_into().unwrap());
1424 /// # async {
1425 /// let mut client = Client::new(homeserver).await.unwrap();
1426 ///
1427 /// client.public_rooms(limit, since, server).await;
1428 /// # };
1429 /// ```
1430 #[cfg_attr(not(target_arch = "wasm32"), deny(clippy::future_not_send))]
1431 pub async fn public_rooms(
1432 &self,
1433 limit: Option<u32>,
1434 since: Option<&str>,
1435 server: Option<&ServerName>,
1436 ) -> HttpResult<get_public_rooms::v3::Response> {
1437 let limit = limit.map(UInt::from);
1438
1439 let request = assign!(get_public_rooms::v3::Request::new(), {
1440 limit,
1441 since: since.map(ToOwned::to_owned),
1442 server: server.map(ToOwned::to_owned),
1443 });
1444 self.send(request).await
1445 }
1446
1447 /// Create a room with the given parameters.
1448 ///
1449 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1450 /// created room.
1451 ///
1452 /// If you want to create a direct message with one specific user, you can
1453 /// use [`create_dm`][Self::create_dm], which is more convenient than
1454 /// assembling the [`create_room::v3::Request`] yourself.
1455 ///
1456 /// If the `is_direct` field of the request is set to `true` and at least
1457 /// one user is invited, the room will be automatically added to the direct
1458 /// rooms in the account data.
1459 ///
1460 /// # Examples
1461 ///
1462 /// ```no_run
1463 /// use matrix_sdk::{
1464 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1465 /// Client,
1466 /// };
1467 /// # use url::Url;
1468 /// #
1469 /// # async {
1470 /// # let homeserver = Url::parse("http://example.com").unwrap();
1471 /// let request = CreateRoomRequest::new();
1472 /// let client = Client::new(homeserver).await.unwrap();
1473 /// assert!(client.create_room(request).await.is_ok());
1474 /// # };
1475 /// ```
1476 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1477 let invite = request.invite.clone();
1478 let is_direct_room = request.is_direct;
1479 let response = self.send(request).await?;
1480
1481 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1482
1483 let joined_room = Room::new(self.clone(), base_room);
1484
1485 if is_direct_room && !invite.is_empty() {
1486 if let Err(error) =
1487 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1488 {
1489 // FIXME: Retry in the background
1490 error!("Failed to mark room as DM: {error}");
1491 }
1492 }
1493
1494 Ok(joined_room)
1495 }
1496
1497 /// Create a DM room.
1498 ///
1499 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1500 /// given user being invited, the room marked `is_direct` and both the
1501 /// creator and invitee getting the default maximum power level.
1502 ///
1503 /// If the `e2e-encryption` feature is enabled, the room will also be
1504 /// encrypted.
1505 ///
1506 /// # Arguments
1507 ///
1508 /// * `user_id` - The ID of the user to create a DM for.
1509 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1510 #[cfg(feature = "e2e-encryption")]
1511 let initial_state =
1512 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1513 .to_raw_any()];
1514
1515 #[cfg(not(feature = "e2e-encryption"))]
1516 let initial_state = vec![];
1517
1518 let request = assign!(create_room::v3::Request::new(), {
1519 invite: vec![user_id.to_owned()],
1520 is_direct: true,
1521 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1522 initial_state,
1523 });
1524
1525 self.create_room(request).await
1526 }
1527
1528 /// Search the homeserver's directory for public rooms with a filter.
1529 ///
1530 /// # Arguments
1531 ///
1532 /// * `room_search` - The easiest way to create this request is using the
1533 /// `get_public_rooms_filtered::Request` itself.
1534 ///
1535 /// # Examples
1536 ///
1537 /// ```no_run
1538 /// # use url::Url;
1539 /// # use matrix_sdk::Client;
1540 /// # async {
1541 /// # let homeserver = Url::parse("http://example.com")?;
1542 /// use matrix_sdk::ruma::{
1543 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1544 /// };
1545 /// # let mut client = Client::new(homeserver).await?;
1546 ///
1547 /// let mut filter = Filter::new();
1548 /// filter.generic_search_term = Some("rust".to_owned());
1549 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1550 /// request.filter = filter;
1551 ///
1552 /// let response = client.public_rooms_filtered(request).await?;
1553 ///
1554 /// for room in response.chunk {
1555 /// println!("Found room {room:?}");
1556 /// }
1557 /// # anyhow::Ok(()) };
1558 /// ```
1559 pub async fn public_rooms_filtered(
1560 &self,
1561 request: get_public_rooms_filtered::v3::Request,
1562 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1563 self.send(request).await
1564 }
1565
1566 /// Send an arbitrary request to the server, without updating client state.
1567 ///
1568 /// **Warning:** Because this method *does not* update the client state, it
1569 /// is important to make sure that you account for this yourself, and
1570 /// use wrapper methods where available. This method should *only* be
1571 /// used if a wrapper method for the endpoint you'd like to use is not
1572 /// available.
1573 ///
1574 /// # Arguments
1575 ///
1576 /// * `request` - A filled out and valid request for the endpoint to be hit
1577 ///
1578 /// * `timeout` - An optional request timeout setting, this overrides the
1579 /// default request setting if one was set.
1580 ///
1581 /// # Examples
1582 ///
1583 /// ```no_run
1584 /// # use matrix_sdk::{Client, config::SyncSettings};
1585 /// # use url::Url;
1586 /// # async {
1587 /// # let homeserver = Url::parse("http://localhost:8080")?;
1588 /// # let mut client = Client::new(homeserver).await?;
1589 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1590 ///
1591 /// // First construct the request you want to make
1592 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1593 /// // for all available Endpoints
1594 /// let user_id = user_id!("@example:localhost").to_owned();
1595 /// let request = profile::get_profile::v3::Request::new(user_id);
1596 ///
1597 /// // Start the request using Client::send()
1598 /// let response = client.send(request).await?;
1599 ///
1600 /// // Check the corresponding Response struct to find out what types are
1601 /// // returned
1602 /// # anyhow::Ok(()) };
1603 /// ```
1604 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1605 where
1606 Request: OutgoingRequest + Clone + Debug,
1607 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1608 {
1609 SendRequest {
1610 client: self.clone(),
1611 request,
1612 config: None,
1613 send_progress: Default::default(),
1614 homeserver_override: None,
1615 }
1616 }
1617
1618 pub(crate) async fn send_inner<Request>(
1619 &self,
1620 request: Request,
1621 config: Option<RequestConfig>,
1622 homeserver_override: Option<String>,
1623 send_progress: SharedObservable<TransmissionProgress>,
1624 ) -> HttpResult<Request::IncomingResponse>
1625 where
1626 Request: OutgoingRequest + Debug,
1627 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1628 {
1629 let homeserver = match homeserver_override {
1630 Some(hs) => hs,
1631 None => self.homeserver().to_string(),
1632 };
1633
1634 let access_token = self.access_token();
1635
1636 self.inner
1637 .http_client
1638 .send(
1639 request,
1640 config,
1641 homeserver,
1642 access_token.as_deref(),
1643 &self.server_versions().await?,
1644 send_progress,
1645 )
1646 .await
1647 }
1648
1649 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1650 _ = self
1651 .inner
1652 .auth_ctx
1653 .session_change_sender
1654 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1655 }
1656
1657 /// Fetches server capabilities from network; no caching.
1658 pub async fn fetch_server_capabilities(
1659 &self,
1660 request_config: Option<RequestConfig>,
1661 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1662 let resp = self
1663 .inner
1664 .http_client
1665 .send(
1666 get_supported_versions::Request::new(),
1667 request_config,
1668 self.homeserver().to_string(),
1669 None,
1670 &[MatrixVersion::V1_0],
1671 Default::default(),
1672 )
1673 .await?;
1674
1675 // Fill both unstable features and server versions at once.
1676 let mut versions = resp.known_versions().collect::<Vec<_>>();
1677 if versions.is_empty() {
1678 versions.push(MatrixVersion::V1_0);
1679 }
1680
1681 Ok((versions.into(), resp.unstable_features))
1682 }
1683
1684 /// Load server capabilities from storage, or fetch them from network and
1685 /// cache them.
1686 async fn load_or_fetch_server_capabilities(
1687 &self,
1688 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1689 match self.store().get_kv_data(StateStoreDataKey::ServerCapabilities).await {
1690 Ok(Some(stored)) => {
1691 if let Some((versions, unstable_features)) =
1692 stored.into_server_capabilities().and_then(|cap| cap.maybe_decode())
1693 {
1694 return Ok((versions.into(), unstable_features));
1695 }
1696 }
1697 Ok(None) => {
1698 // fallthrough: cache is empty
1699 }
1700 Err(err) => {
1701 warn!("error when loading cached server capabilities: {err}");
1702 // fallthrough to network.
1703 }
1704 }
1705
1706 let (versions, unstable_features) = self.fetch_server_capabilities(None).await?;
1707
1708 // Attempt to cache the result in storage.
1709 {
1710 let encoded = ServerCapabilities::new(&versions, unstable_features.clone());
1711 if let Err(err) = self
1712 .store()
1713 .set_kv_data(
1714 StateStoreDataKey::ServerCapabilities,
1715 StateStoreDataValue::ServerCapabilities(encoded),
1716 )
1717 .await
1718 {
1719 warn!("error when caching server capabilities: {err}");
1720 }
1721 }
1722
1723 Ok((versions, unstable_features))
1724 }
1725
1726 async fn get_or_load_and_cache_server_capabilities<
1727 T,
1728 F: Fn(&ClientServerCapabilities) -> Option<T>,
1729 >(
1730 &self,
1731 f: F,
1732 ) -> HttpResult<T> {
1733 let caps = &self.inner.server_capabilities;
1734 if let Some(val) = f(&*caps.read().await) {
1735 return Ok(val);
1736 }
1737
1738 let mut guard = caps.write().await;
1739 if let Some(val) = f(&guard) {
1740 return Ok(val);
1741 }
1742
1743 let (versions, unstable_features) = self.load_or_fetch_server_capabilities().await?;
1744
1745 guard.server_versions = Some(versions);
1746 guard.unstable_features = Some(unstable_features);
1747
1748 // SAFETY: both fields were set above, so the function will always return some.
1749 Ok(f(&guard).unwrap())
1750 }
1751
1752 /// Get the Matrix versions supported by the homeserver by fetching them
1753 /// from the server or the cache.
1754 ///
1755 /// # Examples
1756 ///
1757 /// ```no_run
1758 /// use ruma::api::MatrixVersion;
1759 /// # use matrix_sdk::{Client, config::SyncSettings};
1760 /// # use url::Url;
1761 /// # async {
1762 /// # let homeserver = Url::parse("http://localhost:8080")?;
1763 /// # let mut client = Client::new(homeserver).await?;
1764 ///
1765 /// let server_versions = client.server_versions().await?;
1766 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1767 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1768 /// # anyhow::Ok(()) };
1769 /// ```
1770 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1771 self.get_or_load_and_cache_server_capabilities(|caps| caps.server_versions.clone()).await
1772 }
1773
1774 /// Get the unstable features supported by the homeserver by fetching them
1775 /// from the server or the cache.
1776 ///
1777 /// # Examples
1778 ///
1779 /// ```no_run
1780 /// # use matrix_sdk::{Client, config::SyncSettings};
1781 /// # use url::Url;
1782 /// # async {
1783 /// # let homeserver = Url::parse("http://localhost:8080")?;
1784 /// # let mut client = Client::new(homeserver).await?;
1785 /// let unstable_features = client.unstable_features().await?;
1786 /// let supports_msc_x =
1787 /// unstable_features.get("msc_x").copied().unwrap_or(false);
1788 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1789 /// # anyhow::Ok(()) };
1790 /// ```
1791 pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1792 self.get_or_load_and_cache_server_capabilities(|caps| caps.unstable_features.clone()).await
1793 }
1794
1795 /// Empty the server version and unstable features cache.
1796 ///
1797 /// Since the SDK caches server capabilities (versions and unstable
1798 /// features), it's possible to have a stale entry in the cache. This
1799 /// functions makes it possible to force reset it.
1800 pub async fn reset_server_capabilities(&self) -> Result<()> {
1801 // Empty the in-memory caches.
1802 let mut guard = self.inner.server_capabilities.write().await;
1803 guard.server_versions = None;
1804 guard.unstable_features = None;
1805
1806 // Empty the store cache.
1807 Ok(self.store().remove_kv_data(StateStoreDataKey::ServerCapabilities).await?)
1808 }
1809
1810 /// Check whether MSC 4028 is enabled on the homeserver.
1811 ///
1812 /// # Examples
1813 ///
1814 /// ```no_run
1815 /// # use matrix_sdk::{Client, config::SyncSettings};
1816 /// # use url::Url;
1817 /// # async {
1818 /// # let homeserver = Url::parse("http://localhost:8080")?;
1819 /// # let mut client = Client::new(homeserver).await?;
1820 /// let msc4028_enabled =
1821 /// client.can_homeserver_push_encrypted_event_to_device().await?;
1822 /// # anyhow::Ok(()) };
1823 /// ```
1824 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1825 Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1826 }
1827
1828 /// Get information of all our own devices.
1829 ///
1830 /// # Examples
1831 ///
1832 /// ```no_run
1833 /// # use matrix_sdk::{Client, config::SyncSettings};
1834 /// # use url::Url;
1835 /// # async {
1836 /// # let homeserver = Url::parse("http://localhost:8080")?;
1837 /// # let mut client = Client::new(homeserver).await?;
1838 /// let response = client.devices().await?;
1839 ///
1840 /// for device in response.devices {
1841 /// println!(
1842 /// "Device: {} {}",
1843 /// device.device_id,
1844 /// device.display_name.as_deref().unwrap_or("")
1845 /// );
1846 /// }
1847 /// # anyhow::Ok(()) };
1848 /// ```
1849 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
1850 let request = get_devices::v3::Request::new();
1851
1852 self.send(request).await
1853 }
1854
1855 /// Delete the given devices from the server.
1856 ///
1857 /// # Arguments
1858 ///
1859 /// * `devices` - The list of devices that should be deleted from the
1860 /// server.
1861 ///
1862 /// * `auth_data` - This request requires user interactive auth, the first
1863 /// request needs to set this to `None` and will always fail with an
1864 /// `UiaaResponse`. The response will contain information for the
1865 /// interactive auth and the same request needs to be made but this time
1866 /// with some `auth_data` provided.
1867 ///
1868 /// ```no_run
1869 /// # use matrix_sdk::{
1870 /// # ruma::{api::client::uiaa, device_id},
1871 /// # Client, Error, config::SyncSettings,
1872 /// # };
1873 /// # use serde_json::json;
1874 /// # use url::Url;
1875 /// # use std::collections::BTreeMap;
1876 /// # async {
1877 /// # let homeserver = Url::parse("http://localhost:8080")?;
1878 /// # let mut client = Client::new(homeserver).await?;
1879 /// let devices = &[device_id!("DEVICEID").to_owned()];
1880 ///
1881 /// if let Err(e) = client.delete_devices(devices, None).await {
1882 /// if let Some(info) = e.as_uiaa_response() {
1883 /// let mut password = uiaa::Password::new(
1884 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1885 /// "wordpass".to_owned(),
1886 /// );
1887 /// password.session = info.session.clone();
1888 ///
1889 /// client
1890 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
1891 /// .await?;
1892 /// }
1893 /// }
1894 /// # anyhow::Ok(()) };
1895 pub async fn delete_devices(
1896 &self,
1897 devices: &[OwnedDeviceId],
1898 auth_data: Option<uiaa::AuthData>,
1899 ) -> HttpResult<delete_devices::v3::Response> {
1900 let mut request = delete_devices::v3::Request::new(devices.to_owned());
1901 request.auth = auth_data;
1902
1903 self.send(request).await
1904 }
1905
1906 /// Change the display name of a device owned by the current user.
1907 ///
1908 /// Returns a `update_device::Response` which specifies the result
1909 /// of the operation.
1910 ///
1911 /// # Arguments
1912 ///
1913 /// * `device_id` - The ID of the device to change the display name of.
1914 /// * `display_name` - The new display name to set.
1915 pub async fn rename_device(
1916 &self,
1917 device_id: &DeviceId,
1918 display_name: &str,
1919 ) -> HttpResult<update_device::v3::Response> {
1920 let mut request = update_device::v3::Request::new(device_id.to_owned());
1921 request.display_name = Some(display_name.to_owned());
1922
1923 self.send(request).await
1924 }
1925
1926 /// Synchronize the client's state with the latest state on the server.
1927 ///
1928 /// ## Syncing Events
1929 ///
1930 /// Messages or any other type of event need to be periodically fetched from
1931 /// the server, this is achieved by sending a `/sync` request to the server.
1932 ///
1933 /// The first sync is sent out without a [`token`]. The response of the
1934 /// first sync will contain a [`next_batch`] field which should then be
1935 /// used in the subsequent sync calls as the [`token`]. This ensures that we
1936 /// don't receive the same events multiple times.
1937 ///
1938 /// ## Long Polling
1939 ///
1940 /// A sync should in the usual case always be in flight. The
1941 /// [`SyncSettings`] have a [`timeout`] option, which controls how
1942 /// long the server will wait for new events before it will respond.
1943 /// The server will respond immediately if some new events arrive before the
1944 /// timeout has expired. If no changes arrive and the timeout expires an
1945 /// empty sync response will be sent to the client.
1946 ///
1947 /// This method of sending a request that may not receive a response
1948 /// immediately is called long polling.
1949 ///
1950 /// ## Filtering Events
1951 ///
1952 /// The number or type of messages and events that the client should receive
1953 /// from the server can be altered using a [`Filter`].
1954 ///
1955 /// Filters can be non-trivial and, since they will be sent with every sync
1956 /// request, they may take up a bunch of unnecessary bandwidth.
1957 ///
1958 /// Luckily filters can be uploaded to the server and reused using an unique
1959 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
1960 /// method.
1961 ///
1962 /// # Arguments
1963 ///
1964 /// * `sync_settings` - Settings for the sync call, this allows us to set
1965 /// various options to configure the sync:
1966 /// * [`filter`] - To configure which events we receive and which get
1967 /// [filtered] by the server
1968 /// * [`timeout`] - To configure our [long polling] setup.
1969 /// * [`token`] - To tell the server which events we already received
1970 /// and where we wish to continue syncing.
1971 /// * [`full_state`] - To tell the server that we wish to receive all
1972 /// state events, regardless of our configured [`token`].
1973 /// * [`set_presence`] - To tell the server to set the presence and to
1974 /// which state.
1975 ///
1976 /// # Examples
1977 ///
1978 /// ```no_run
1979 /// # use url::Url;
1980 /// # async {
1981 /// # let homeserver = Url::parse("http://localhost:8080")?;
1982 /// # let username = "";
1983 /// # let password = "";
1984 /// use matrix_sdk::{
1985 /// config::SyncSettings,
1986 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
1987 /// };
1988 ///
1989 /// let client = Client::new(homeserver).await?;
1990 /// client.matrix_auth().login_username(username, password).send().await?;
1991 ///
1992 /// // Sync once so we receive the client state and old messages.
1993 /// client.sync_once(SyncSettings::default()).await?;
1994 ///
1995 /// // Register our handler so we start responding once we receive a new
1996 /// // event.
1997 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
1998 /// println!("Received event {}: {:?}", ev.sender, ev.content);
1999 /// });
2000 ///
2001 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2002 /// // from our `sync_once()` call automatically.
2003 /// client.sync(SyncSettings::default()).await;
2004 /// # anyhow::Ok(()) };
2005 /// ```
2006 ///
2007 /// [`sync`]: #method.sync
2008 /// [`SyncSettings`]: crate::config::SyncSettings
2009 /// [`token`]: crate::config::SyncSettings#method.token
2010 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2011 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2012 /// [`set_presence`]: ruma::presence::PresenceState
2013 /// [`filter`]: crate::config::SyncSettings#method.filter
2014 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2015 /// [`next_batch`]: SyncResponse#structfield.next_batch
2016 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2017 /// [long polling]: #long-polling
2018 /// [filtered]: #filtering-events
2019 #[instrument(skip(self))]
2020 pub async fn sync_once(
2021 &self,
2022 sync_settings: crate::config::SyncSettings,
2023 ) -> Result<SyncResponse> {
2024 // The sync might not return for quite a while due to the timeout.
2025 // We'll see if there's anything crypto related to send out before we
2026 // sync, i.e. if we closed our client after a sync but before the
2027 // crypto requests were sent out.
2028 //
2029 // This will mostly be a no-op.
2030 #[cfg(feature = "e2e-encryption")]
2031 if let Err(e) = self.send_outgoing_requests().await {
2032 error!(error = ?e, "Error while sending outgoing E2EE requests");
2033 }
2034
2035 let request = assign!(sync_events::v3::Request::new(), {
2036 filter: sync_settings.filter.map(|f| *f),
2037 since: sync_settings.token,
2038 full_state: sync_settings.full_state,
2039 set_presence: sync_settings.set_presence,
2040 timeout: sync_settings.timeout,
2041 });
2042 let mut request_config = self.request_config();
2043 if let Some(timeout) = sync_settings.timeout {
2044 request_config.timeout += timeout;
2045 }
2046
2047 let response = self.send(request).with_request_config(request_config).await?;
2048 let next_batch = response.next_batch.clone();
2049 let response = self.process_sync(response).await?;
2050
2051 #[cfg(feature = "e2e-encryption")]
2052 if let Err(e) = self.send_outgoing_requests().await {
2053 error!(error = ?e, "Error while sending outgoing E2EE requests");
2054 }
2055
2056 self.inner.sync_beat.notify(usize::MAX);
2057
2058 Ok(SyncResponse::new(next_batch, response))
2059 }
2060
2061 /// Repeatedly synchronize the client state with the server.
2062 ///
2063 /// This method will only return on error, if cancellation is needed
2064 /// the method should be wrapped in a cancelable task or the
2065 /// [`Client::sync_with_callback`] method can be used or
2066 /// [`Client::sync_with_result_callback`] if you want to handle error
2067 /// cases in the loop, too.
2068 ///
2069 /// This method will internally call [`Client::sync_once`] in a loop.
2070 ///
2071 /// This method can be used with the [`Client::add_event_handler`]
2072 /// method to react to individual events. If you instead wish to handle
2073 /// events in a bulk manner the [`Client::sync_with_callback`],
2074 /// [`Client::sync_with_result_callback`] and
2075 /// [`Client::sync_stream`] methods can be used instead. Those methods
2076 /// repeatedly return the whole sync response.
2077 ///
2078 /// # Arguments
2079 ///
2080 /// * `sync_settings` - Settings for the sync call. *Note* that those
2081 /// settings will be only used for the first sync call. See the argument
2082 /// docs for [`Client::sync_once`] for more info.
2083 ///
2084 /// # Return
2085 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2086 /// up to the user of the API to check the error and decide whether the sync
2087 /// should continue or not.
2088 ///
2089 /// # Examples
2090 ///
2091 /// ```no_run
2092 /// # use url::Url;
2093 /// # async {
2094 /// # let homeserver = Url::parse("http://localhost:8080")?;
2095 /// # let username = "";
2096 /// # let password = "";
2097 /// use matrix_sdk::{
2098 /// config::SyncSettings,
2099 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2100 /// };
2101 ///
2102 /// let client = Client::new(homeserver).await?;
2103 /// client.matrix_auth().login_username(&username, &password).send().await?;
2104 ///
2105 /// // Register our handler so we start responding once we receive a new
2106 /// // event.
2107 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2108 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2109 /// });
2110 ///
2111 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2112 /// // automatically.
2113 /// client.sync(SyncSettings::default()).await?;
2114 /// # anyhow::Ok(()) };
2115 /// ```
2116 ///
2117 /// [argument docs]: #method.sync_once
2118 /// [`sync_with_callback`]: #method.sync_with_callback
2119 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2120 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2121 }
2122
2123 /// Repeatedly call sync to synchronize the client state with the server.
2124 ///
2125 /// # Arguments
2126 ///
2127 /// * `sync_settings` - Settings for the sync call. *Note* that those
2128 /// settings will be only used for the first sync call. See the argument
2129 /// docs for [`Client::sync_once`] for more info.
2130 ///
2131 /// * `callback` - A callback that will be called every time a successful
2132 /// response has been fetched from the server. The callback must return a
2133 /// boolean which signalizes if the method should stop syncing. If the
2134 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2135 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2136 ///
2137 /// # Return
2138 /// The sync runs until an error occurs or the
2139 /// callback indicates that the Loop should stop. If the callback asked for
2140 /// a regular stop, the result will be `Ok(())` otherwise the
2141 /// `Err(Error)` is returned.
2142 ///
2143 /// # Examples
2144 ///
2145 /// The following example demonstrates how to sync forever while sending all
2146 /// the interesting events through a mpsc channel to another thread e.g. a
2147 /// UI thread.
2148 ///
2149 /// ```no_run
2150 /// # use std::time::Duration;
2151 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2152 /// # use url::Url;
2153 /// # async {
2154 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2155 /// # let mut client = Client::new(homeserver).await.unwrap();
2156 ///
2157 /// use tokio::sync::mpsc::channel;
2158 ///
2159 /// let (tx, rx) = channel(100);
2160 ///
2161 /// let sync_channel = &tx;
2162 /// let sync_settings = SyncSettings::new()
2163 /// .timeout(Duration::from_secs(30));
2164 ///
2165 /// client
2166 /// .sync_with_callback(sync_settings, |response| async move {
2167 /// let channel = sync_channel;
2168 /// for (room_id, room) in response.rooms.join {
2169 /// for event in room.timeline.events {
2170 /// channel.send(event).await.unwrap();
2171 /// }
2172 /// }
2173 ///
2174 /// LoopCtrl::Continue
2175 /// })
2176 /// .await;
2177 /// };
2178 /// ```
2179 #[instrument(skip_all)]
2180 pub async fn sync_with_callback<C>(
2181 &self,
2182 sync_settings: crate::config::SyncSettings,
2183 callback: impl Fn(SyncResponse) -> C,
2184 ) -> Result<(), Error>
2185 where
2186 C: Future<Output = LoopCtrl>,
2187 {
2188 self.sync_with_result_callback(sync_settings, |result| async {
2189 Ok(callback(result?).await)
2190 })
2191 .await
2192 }
2193
2194 /// Repeatedly call sync to synchronize the client state with the server.
2195 ///
2196 /// # Arguments
2197 ///
2198 /// * `sync_settings` - Settings for the sync call. *Note* that those
2199 /// settings will be only used for the first sync call. See the argument
2200 /// docs for [`Client::sync_once`] for more info.
2201 ///
2202 /// * `callback` - A callback that will be called every time after a
2203 /// response has been received, failure or not. The callback returns a
2204 /// `Result<LoopCtrl, Error>`, too. When returning
2205 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2206 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2207 /// function returns `Ok(())`. In case the callback can't handle the
2208 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2209 /// which results in the sync ending and the `Err(Error)` being returned.
2210 ///
2211 /// # Return
2212 /// The sync runs until an error occurs that the callback can't handle or
2213 /// the callback indicates that the Loop should stop. If the callback
2214 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2215 /// `Err(Error)` is returned.
2216 ///
2217 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2218 /// this, and are handled first without sending the result to the
2219 /// callback. Only after they have exceeded is the `Result` handed to
2220 /// the callback.
2221 ///
2222 /// # Examples
2223 ///
2224 /// The following example demonstrates how to sync forever while sending all
2225 /// the interesting events through a mpsc channel to another thread e.g. a
2226 /// UI thread.
2227 ///
2228 /// ```no_run
2229 /// # use std::time::Duration;
2230 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2231 /// # use url::Url;
2232 /// # async {
2233 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2234 /// # let mut client = Client::new(homeserver).await.unwrap();
2235 /// #
2236 /// use tokio::sync::mpsc::channel;
2237 ///
2238 /// let (tx, rx) = channel(100);
2239 ///
2240 /// let sync_channel = &tx;
2241 /// let sync_settings = SyncSettings::new()
2242 /// .timeout(Duration::from_secs(30));
2243 ///
2244 /// client
2245 /// .sync_with_result_callback(sync_settings, |response| async move {
2246 /// let channel = sync_channel;
2247 /// let sync_response = response?;
2248 /// for (room_id, room) in sync_response.rooms.join {
2249 /// for event in room.timeline.events {
2250 /// channel.send(event).await.unwrap();
2251 /// }
2252 /// }
2253 ///
2254 /// Ok(LoopCtrl::Continue)
2255 /// })
2256 /// .await;
2257 /// };
2258 /// ```
2259 #[instrument(skip(self, callback))]
2260 pub async fn sync_with_result_callback<C>(
2261 &self,
2262 mut sync_settings: crate::config::SyncSettings,
2263 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2264 ) -> Result<(), Error>
2265 where
2266 C: Future<Output = Result<LoopCtrl, Error>>,
2267 {
2268 let mut last_sync_time: Option<Instant> = None;
2269
2270 if sync_settings.token.is_none() {
2271 sync_settings.token = self.sync_token().await;
2272 }
2273
2274 loop {
2275 trace!("Syncing");
2276 let result = self.sync_loop_helper(&mut sync_settings).await;
2277
2278 trace!("Running callback");
2279 if callback(result).await? == LoopCtrl::Break {
2280 trace!("Callback told us to stop");
2281 break;
2282 }
2283 trace!("Done running callback");
2284
2285 Client::delay_sync(&mut last_sync_time).await
2286 }
2287
2288 Ok(())
2289 }
2290
2291 //// Repeatedly synchronize the client state with the server.
2292 ///
2293 /// This method will internally call [`Client::sync_once`] in a loop and is
2294 /// equivalent to the [`Client::sync`] method but the responses are provided
2295 /// as an async stream.
2296 ///
2297 /// # Arguments
2298 ///
2299 /// * `sync_settings` - Settings for the sync call. *Note* that those
2300 /// settings will be only used for the first sync call. See the argument
2301 /// docs for [`Client::sync_once`] for more info.
2302 ///
2303 /// # Examples
2304 ///
2305 /// ```no_run
2306 /// # use url::Url;
2307 /// # async {
2308 /// # let homeserver = Url::parse("http://localhost:8080")?;
2309 /// # let username = "";
2310 /// # let password = "";
2311 /// use futures_util::StreamExt;
2312 /// use matrix_sdk::{config::SyncSettings, Client};
2313 ///
2314 /// let client = Client::new(homeserver).await?;
2315 /// client.matrix_auth().login_username(&username, &password).send().await?;
2316 ///
2317 /// let mut sync_stream =
2318 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2319 ///
2320 /// while let Some(Ok(response)) = sync_stream.next().await {
2321 /// for room in response.rooms.join.values() {
2322 /// for e in &room.timeline.events {
2323 /// if let Ok(event) = e.raw().deserialize() {
2324 /// println!("Received event {:?}", event);
2325 /// }
2326 /// }
2327 /// }
2328 /// }
2329 ///
2330 /// # anyhow::Ok(()) };
2331 /// ```
2332 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2333 #[instrument(skip(self))]
2334 pub async fn sync_stream(
2335 &self,
2336 mut sync_settings: crate::config::SyncSettings,
2337 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2338 let mut last_sync_time: Option<Instant> = None;
2339
2340 if sync_settings.token.is_none() {
2341 sync_settings.token = self.sync_token().await;
2342 }
2343
2344 let parent_span = Span::current();
2345
2346 async_stream::stream! {
2347 loop {
2348 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2349
2350 Client::delay_sync(&mut last_sync_time).await
2351 }
2352 }
2353 }
2354
2355 /// Get the current, if any, sync token of the client.
2356 /// This will be None if the client didn't sync at least once.
2357 pub(crate) async fn sync_token(&self) -> Option<String> {
2358 self.inner.base_client.sync_token().await
2359 }
2360
2361 /// Gets information about the owner of a given access token.
2362 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2363 let request = whoami::v3::Request::new();
2364 self.send(request).await
2365 }
2366
2367 /// Subscribes a new receiver to client SessionChange broadcasts.
2368 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2369 let broadcast = &self.inner.auth_ctx.session_change_sender;
2370 broadcast.subscribe()
2371 }
2372
2373 /// Sets the save/restore session callbacks.
2374 ///
2375 /// This is another mechanism to get synchronous updates to session tokens,
2376 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2377 pub fn set_session_callbacks(
2378 &self,
2379 reload_session_callback: Box<ReloadSessionCallback>,
2380 save_session_callback: Box<SaveSessionCallback>,
2381 ) -> Result<()> {
2382 self.inner
2383 .auth_ctx
2384 .reload_session_callback
2385 .set(reload_session_callback)
2386 .map_err(|_| Error::MultipleSessionCallbacks)?;
2387
2388 self.inner
2389 .auth_ctx
2390 .save_session_callback
2391 .set(save_session_callback)
2392 .map_err(|_| Error::MultipleSessionCallbacks)?;
2393
2394 Ok(())
2395 }
2396
2397 /// Get the notification settings of the current owner of the client.
2398 pub async fn notification_settings(&self) -> NotificationSettings {
2399 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2400 NotificationSettings::new(self.clone(), ruleset)
2401 }
2402
2403 /// Create a new specialized `Client` that can process notifications.
2404 ///
2405 /// See [`CrossProcessStoreLock::new`] to learn more about
2406 /// `cross_process_store_locks_holder_name`.
2407 ///
2408 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2409 pub async fn notification_client(
2410 &self,
2411 cross_process_store_locks_holder_name: String,
2412 ) -> Result<Client> {
2413 let client = Client {
2414 inner: ClientInner::new(
2415 self.inner.auth_ctx.clone(),
2416 self.server().cloned(),
2417 self.homeserver(),
2418 self.sliding_sync_version(),
2419 self.inner.http_client.clone(),
2420 self.inner
2421 .base_client
2422 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name)
2423 .await?,
2424 self.inner.server_capabilities.read().await.clone(),
2425 self.inner.respect_login_well_known,
2426 self.inner.event_cache.clone(),
2427 self.inner.send_queue_data.clone(),
2428 #[cfg(feature = "e2e-encryption")]
2429 self.inner.e2ee.encryption_settings,
2430 cross_process_store_locks_holder_name,
2431 )
2432 .await,
2433 };
2434
2435 Ok(client)
2436 }
2437
2438 /// The [`EventCache`] instance for this [`Client`].
2439 pub fn event_cache(&self) -> &EventCache {
2440 // SAFETY: always initialized in the `Client` ctor.
2441 self.inner.event_cache.get().unwrap()
2442 }
2443
2444 /// Waits until an at least partially synced room is received, and returns
2445 /// it.
2446 ///
2447 /// **Note: this function will loop endlessly until either it finds the room
2448 /// or an externally set timeout happens.**
2449 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2450 loop {
2451 if let Some(room) = self.get_room(room_id) {
2452 if room.is_state_partially_or_fully_synced() {
2453 debug!("Found just created room!");
2454 return room;
2455 }
2456 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2457 } else {
2458 debug!("Room wasn't found, waiting for sync beat to try again");
2459 }
2460 self.inner.sync_beat.listen().await;
2461 }
2462 }
2463
2464 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2465 /// join it.
2466 pub async fn knock(
2467 &self,
2468 room_id_or_alias: OwnedRoomOrAliasId,
2469 reason: Option<String>,
2470 server_names: Vec<OwnedServerName>,
2471 ) -> Result<Room> {
2472 let request =
2473 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2474 let response = self.send(request).await?;
2475 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2476 Ok(Room::new(self.clone(), base_room))
2477 }
2478}
2479
2480/// A weak reference to the inner client, useful when trying to get a handle
2481/// on the owning client.
2482#[derive(Clone)]
2483pub(crate) struct WeakClient {
2484 client: Weak<ClientInner>,
2485}
2486
2487impl WeakClient {
2488 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2489 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2490 Self { client: Arc::downgrade(client) }
2491 }
2492
2493 /// Construct a [`WeakClient`] from a [`Client`].
2494 pub fn from_client(client: &Client) -> Self {
2495 Self::from_inner(&client.inner)
2496 }
2497
2498 /// Attempts to get a [`Client`] from this [`WeakClient`].
2499 pub fn get(&self) -> Option<Client> {
2500 self.client.upgrade().map(|inner| Client { inner })
2501 }
2502
2503 /// Gets the number of strong (`Arc`) pointers still pointing to this
2504 /// client.
2505 #[allow(dead_code)]
2506 pub fn strong_count(&self) -> usize {
2507 self.client.strong_count()
2508 }
2509}
2510
2511#[derive(Clone)]
2512struct ClientServerCapabilities {
2513 /// The Matrix versions the server supports (well-known ones only).
2514 server_versions: Option<Box<[MatrixVersion]>>,
2515
2516 /// The unstable features and their on/off state on the server.
2517 unstable_features: Option<BTreeMap<String, bool>>,
2518}
2519
2520// The http mocking library is not supported for wasm32
2521#[cfg(all(test, not(target_arch = "wasm32")))]
2522pub(crate) mod tests {
2523 use std::{sync::Arc, time::Duration};
2524
2525 use assert_matches::assert_matches;
2526 use futures_util::FutureExt;
2527 use matrix_sdk_base::{
2528 store::{MemoryStore, StoreConfig},
2529 RoomState,
2530 };
2531 use matrix_sdk_test::{
2532 async_test, test_json, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
2533 DEFAULT_TEST_ROOM_ID,
2534 };
2535 #[cfg(target_arch = "wasm32")]
2536 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2537
2538 use ruma::{
2539 api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
2540 assign,
2541 events::ignored_user_list::IgnoredUserListEventContent,
2542 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2543 };
2544 use serde_json::json;
2545 use tokio::{
2546 spawn,
2547 time::{sleep, timeout},
2548 };
2549 use url::Url;
2550 use wiremock::{
2551 matchers::{body_json, header, method, path, query_param_is_missing},
2552 Mock, MockServer, ResponseTemplate,
2553 };
2554
2555 use super::Client;
2556 use crate::{
2557 client::WeakClient,
2558 config::{RequestConfig, SyncSettings},
2559 test_utils::{
2560 logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2561 test_client_builder, test_client_builder_with_server,
2562 },
2563 Error,
2564 };
2565
2566 #[async_test]
2567 async fn test_account_data() {
2568 let server = MockServer::start().await;
2569 let client = logged_in_client(Some(server.uri())).await;
2570
2571 Mock::given(method("GET"))
2572 .and(path("/_matrix/client/r0/sync".to_owned()))
2573 .and(header("authorization", "Bearer 1234"))
2574 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2575 .mount(&server)
2576 .await;
2577
2578 let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2579 let _response = client.sync_once(sync_settings).await.unwrap();
2580
2581 let content = client
2582 .account()
2583 .account_data::<IgnoredUserListEventContent>()
2584 .await
2585 .unwrap()
2586 .unwrap()
2587 .deserialize()
2588 .unwrap();
2589
2590 assert_eq!(content.ignored_users.len(), 1);
2591 }
2592
2593 #[async_test]
2594 async fn test_successful_discovery() {
2595 // Imagine this is `matrix.org`.
2596 let server = MockServer::start().await;
2597
2598 // Imagine this is `matrix-client.matrix.org`.
2599 let homeserver = MockServer::start().await;
2600
2601 // Imagine Alice has the user ID `@alice:matrix.org`.
2602 let server_url = server.uri();
2603 let domain = server_url.strip_prefix("http://").unwrap();
2604 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2605
2606 // The `.well-known` is on the server (e.g. `matrix.org`).
2607 Mock::given(method("GET"))
2608 .and(path("/.well-known/matrix/client"))
2609 .respond_with(ResponseTemplate::new(200).set_body_raw(
2610 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2611 "application/json",
2612 ))
2613 .mount(&server)
2614 .await;
2615
2616 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2617 Mock::given(method("GET"))
2618 .and(path("/_matrix/client/versions"))
2619 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2620 .mount(&homeserver)
2621 .await;
2622
2623 let client = Client::builder()
2624 .insecure_server_name_no_tls(alice.server_name())
2625 .build()
2626 .await
2627 .unwrap();
2628
2629 assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2630 assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2631 }
2632
2633 #[async_test]
2634 async fn test_discovery_broken_server() {
2635 let server = MockServer::start().await;
2636 let server_url = server.uri();
2637 let domain = server_url.strip_prefix("http://").unwrap();
2638 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2639
2640 Mock::given(method("GET"))
2641 .and(path("/.well-known/matrix/client"))
2642 .respond_with(ResponseTemplate::new(404))
2643 .mount(&server)
2644 .await;
2645
2646 assert!(
2647 Client::builder()
2648 .insecure_server_name_no_tls(alice.server_name())
2649 .build()
2650 .await
2651 .is_err(),
2652 "Creating a client from a user ID should fail when the .well-known request fails."
2653 );
2654 }
2655
2656 #[async_test]
2657 async fn test_room_creation() {
2658 let server = MockServer::start().await;
2659 let client = logged_in_client(Some(server.uri())).await;
2660
2661 let response = SyncResponseBuilder::default()
2662 .add_joined_room(
2663 JoinedRoomBuilder::default()
2664 .add_state_event(StateTestEvent::Member)
2665 .add_state_event(StateTestEvent::PowerLevels),
2666 )
2667 .build_sync_response();
2668
2669 client.inner.base_client.receive_sync_response(response).await.unwrap();
2670
2671 assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2672
2673 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2674 assert_eq!(room.state(), RoomState::Joined);
2675 }
2676
2677 #[async_test]
2678 async fn test_retry_limit_http_requests() {
2679 let server = MockServer::start().await;
2680 let client = test_client_builder(Some(server.uri()))
2681 .request_config(RequestConfig::new().retry_limit(3))
2682 .build()
2683 .await
2684 .unwrap();
2685
2686 assert!(client.request_config().retry_limit.unwrap() == 3);
2687
2688 Mock::given(method("POST"))
2689 .and(path("/_matrix/client/r0/login"))
2690 .respond_with(ResponseTemplate::new(501))
2691 .expect(3)
2692 .mount(&server)
2693 .await;
2694
2695 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2696 }
2697
2698 #[async_test]
2699 async fn test_retry_timeout_http_requests() {
2700 // Keep this timeout small so that the test doesn't take long
2701 let retry_timeout = Duration::from_secs(5);
2702 let server = MockServer::start().await;
2703 let client = test_client_builder(Some(server.uri()))
2704 .request_config(RequestConfig::new().retry_timeout(retry_timeout))
2705 .build()
2706 .await
2707 .unwrap();
2708
2709 assert!(client.request_config().retry_timeout.unwrap() == retry_timeout);
2710
2711 Mock::given(method("POST"))
2712 .and(path("/_matrix/client/r0/login"))
2713 .respond_with(ResponseTemplate::new(501))
2714 .expect(2..)
2715 .mount(&server)
2716 .await;
2717
2718 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2719 }
2720
2721 #[async_test]
2722 async fn test_short_retry_initial_http_requests() {
2723 let server = MockServer::start().await;
2724 let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2725
2726 Mock::given(method("POST"))
2727 .and(path("/_matrix/client/r0/login"))
2728 .respond_with(ResponseTemplate::new(501))
2729 .expect(3..)
2730 .mount(&server)
2731 .await;
2732
2733 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2734 }
2735
2736 #[async_test]
2737 async fn test_no_retry_http_requests() {
2738 let server = MockServer::start().await;
2739 let client = logged_in_client(Some(server.uri())).await;
2740
2741 Mock::given(method("GET"))
2742 .and(path("/_matrix/client/r0/devices"))
2743 .respond_with(ResponseTemplate::new(501))
2744 .expect(1)
2745 .mount(&server)
2746 .await;
2747
2748 client.devices().await.unwrap_err();
2749 }
2750
2751 #[async_test]
2752 async fn test_set_homeserver() {
2753 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2754 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2755
2756 let homeserver = Url::parse("http://example.com/").unwrap();
2757 client.set_homeserver(homeserver.clone());
2758 assert_eq!(client.homeserver(), homeserver);
2759 }
2760
2761 #[async_test]
2762 async fn test_search_user_request() {
2763 let server = MockServer::start().await;
2764 let client = logged_in_client(Some(server.uri())).await;
2765
2766 Mock::given(method("POST"))
2767 .and(path("_matrix/client/r0/user_directory/search"))
2768 .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2769 .respond_with(
2770 ResponseTemplate::new(200)
2771 .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
2772 )
2773 .mount(&server)
2774 .await;
2775
2776 let response = client.search_users("test", 50).await.unwrap();
2777 assert_eq!(response.results.len(), 1);
2778 let result = &response.results[0];
2779 assert_eq!(result.user_id.to_string(), "@test:example.me");
2780 assert_eq!(result.display_name.clone().unwrap(), "Test");
2781 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
2782 assert!(!response.limited);
2783 }
2784
2785 #[async_test]
2786 async fn test_request_unstable_features() {
2787 let server = MockServer::start().await;
2788 let client = logged_in_client(Some(server.uri())).await;
2789
2790 Mock::given(method("GET"))
2791 .and(path("_matrix/client/versions"))
2792 .respond_with(
2793 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2794 )
2795 .mount(&server)
2796 .await;
2797
2798 let unstable_features = client.unstable_features().await.unwrap();
2799 assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
2800 assert_eq!(unstable_features.get("you.shall.pass"), None);
2801 }
2802
2803 #[async_test]
2804 async fn test_can_homeserver_push_encrypted_event_to_device() {
2805 let server = MockServer::start().await;
2806 let client = logged_in_client(Some(server.uri())).await;
2807
2808 Mock::given(method("GET"))
2809 .and(path("_matrix/client/versions"))
2810 .respond_with(
2811 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2812 )
2813 .mount(&server)
2814 .await;
2815
2816 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
2817 assert!(msc4028_enabled);
2818 }
2819
2820 #[async_test]
2821 async fn test_recently_visited_rooms() {
2822 // Tracking recently visited rooms requires authentication
2823 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2824 assert_matches!(
2825 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
2826 Err(Error::AuthenticationRequired)
2827 );
2828
2829 let client = logged_in_client(None).await;
2830 let account = client.account();
2831
2832 // We should start off with an empty list
2833 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
2834
2835 // Tracking a valid room id should add it to the list
2836 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2837 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2838 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2839
2840 // And the existing list shouldn't be changed
2841 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2842 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2843
2844 // Tracking the same room again shouldn't change the list
2845 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2846 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2847 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2848
2849 // Tracking a second room should add it to the front of the list
2850 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
2851 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2852 assert_eq!(
2853 account.get_recently_visited_rooms().await.unwrap(),
2854 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
2855 );
2856
2857 // Tracking the first room yet again should move it to the front of the list
2858 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2859 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2860 assert_eq!(
2861 account.get_recently_visited_rooms().await.unwrap(),
2862 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
2863 );
2864
2865 // Tracking should be capped at 20
2866 for n in 0..20 {
2867 account
2868 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
2869 .await
2870 .unwrap();
2871 }
2872
2873 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
2874
2875 // And the initial rooms should've been pushed out
2876 let rooms = account.get_recently_visited_rooms().await.unwrap();
2877 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
2878 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
2879
2880 // And the last tracked room should be the first
2881 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
2882 }
2883
2884 #[async_test]
2885 async fn test_client_no_cycle_with_event_cache() {
2886 let client = logged_in_client(None).await;
2887
2888 // Wait for the init tasks to die.
2889 sleep(Duration::from_secs(1)).await;
2890
2891 let weak_client = WeakClient::from_client(&client);
2892 assert_eq!(weak_client.strong_count(), 1);
2893
2894 {
2895 let room_id = room_id!("!room:example.org");
2896
2897 // Have the client know the room.
2898 let response = SyncResponseBuilder::default()
2899 .add_joined_room(JoinedRoomBuilder::new(room_id))
2900 .build_sync_response();
2901 client.inner.base_client.receive_sync_response(response).await.unwrap();
2902
2903 client.event_cache().subscribe().unwrap();
2904
2905 let (_room_event_cache, _drop_handles) =
2906 client.get_room(room_id).unwrap().event_cache().await.unwrap();
2907 }
2908
2909 drop(client);
2910
2911 // Give a bit of time for background tasks to die.
2912 sleep(Duration::from_secs(1)).await;
2913
2914 // The weak client must be the last reference to the client now.
2915 assert_eq!(weak_client.strong_count(), 0);
2916 let client = weak_client.get();
2917 assert!(
2918 client.is_none(),
2919 "too many strong references to the client: {}",
2920 Arc::strong_count(&client.unwrap().inner)
2921 );
2922 }
2923
2924 #[async_test]
2925 async fn test_server_capabilities_caching() {
2926 let server = MockServer::start().await;
2927 let server_url = server.uri();
2928 let domain = server_url.strip_prefix("http://").unwrap();
2929 let server_name = <&ServerName>::try_from(domain).unwrap();
2930
2931 Mock::given(method("GET"))
2932 .and(path("/.well-known/matrix/client"))
2933 .respond_with(ResponseTemplate::new(200).set_body_raw(
2934 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
2935 "application/json",
2936 ))
2937 .named("well known mock")
2938 .expect(2)
2939 .mount(&server)
2940 .await;
2941
2942 let versions_mock = Mock::given(method("GET"))
2943 .and(path("/_matrix/client/versions"))
2944 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2945 .named("first versions mock")
2946 .expect(1)
2947 .mount_as_scoped(&server)
2948 .await;
2949
2950 let memory_store = Arc::new(MemoryStore::new());
2951 let client = Client::builder()
2952 .insecure_server_name_no_tls(server_name)
2953 .store_config(
2954 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
2955 .state_store(memory_store.clone()),
2956 )
2957 .build()
2958 .await
2959 .unwrap();
2960
2961 assert_eq!(client.server_versions().await.unwrap().len(), 1);
2962
2963 // This second call hits the in-memory cache.
2964 assert!(client
2965 .server_versions()
2966 .await
2967 .unwrap()
2968 .iter()
2969 .any(|version| *version == MatrixVersion::V1_0));
2970
2971 drop(client);
2972
2973 let client = Client::builder()
2974 .insecure_server_name_no_tls(server_name)
2975 .store_config(
2976 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
2977 .state_store(memory_store.clone()),
2978 )
2979 .build()
2980 .await
2981 .unwrap();
2982
2983 // This third call hits the on-disk cache.
2984 assert_eq!(
2985 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
2986 Some(&true)
2987 );
2988
2989 drop(versions_mock);
2990 server.verify().await;
2991
2992 // Now, reset the cache, and observe the endpoint being called again once.
2993 client.reset_server_capabilities().await.unwrap();
2994
2995 Mock::given(method("GET"))
2996 .and(path("/_matrix/client/versions"))
2997 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2998 .expect(1)
2999 .named("second versions mock")
3000 .mount(&server)
3001 .await;
3002
3003 // Hits network again.
3004 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3005 // Hits in-memory cache again.
3006 assert!(client
3007 .server_versions()
3008 .await
3009 .unwrap()
3010 .iter()
3011 .any(|version| *version == MatrixVersion::V1_0));
3012 }
3013
3014 #[async_test]
3015 async fn test_no_network_doesnt_cause_infinite_retries() {
3016 // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3017 // since we want infinite retries for transient errors.
3018 let client =
3019 test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3020 set_client_session(&client).await;
3021
3022 // We don't define a mock server on purpose here, so that the error is really a
3023 // network error.
3024 client.whoami().await.unwrap_err();
3025 }
3026
3027 #[async_test]
3028 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3029 let (client_builder, server) = test_client_builder_with_server().await;
3030 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3031 set_client_session(&client).await;
3032
3033 let builder = Mock::given(method("GET"))
3034 .and(path("/_matrix/client/r0/sync"))
3035 .and(header("authorization", "Bearer 1234"))
3036 .and(query_param_is_missing("since"));
3037
3038 let room_id = room_id!("!room:example.org");
3039 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3040 let mut sync_response_builder = SyncResponseBuilder::new();
3041 sync_response_builder.add_joined_room(joined_room_builder);
3042 let response_body = sync_response_builder.build_json_sync_response();
3043
3044 builder
3045 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3046 .mount(&server)
3047 .await;
3048
3049 client.sync_once(SyncSettings::default()).await.unwrap();
3050
3051 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3052 assert_eq!(room.room_id(), room_id);
3053 }
3054
3055 #[async_test]
3056 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3057 let (client_builder, server) = test_client_builder_with_server().await;
3058 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3059 set_client_session(&client).await;
3060
3061 let builder = Mock::given(method("GET"))
3062 .and(path("/_matrix/client/r0/sync"))
3063 .and(header("authorization", "Bearer 1234"))
3064 .and(query_param_is_missing("since"));
3065
3066 let room_id = room_id!("!room:example.org");
3067 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3068 let mut sync_response_builder = SyncResponseBuilder::new();
3069 sync_response_builder.add_joined_room(joined_room_builder);
3070 let response_body = sync_response_builder.build_json_sync_response();
3071
3072 builder
3073 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3074 .mount(&server)
3075 .await;
3076
3077 let client = Arc::new(client);
3078
3079 // Perform the /sync request with a delay so it starts after the
3080 // `await_room_remote_echo` call has happened
3081 spawn({
3082 let client = client.clone();
3083 async move {
3084 sleep(Duration::from_millis(100)).await;
3085 client.sync_once(SyncSettings::default()).await.unwrap();
3086 }
3087 });
3088
3089 let room =
3090 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3091 assert_eq!(room.room_id(), room_id);
3092 }
3093
3094 #[async_test]
3095 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3096 let (client_builder, _) = test_client_builder_with_server().await;
3097 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3098 set_client_session(&client).await;
3099
3100 let room_id = room_id!("!room:example.org");
3101 // Room is not present so the client won't be able to find it. The call will
3102 // timeout.
3103 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3104 }
3105
3106 #[async_test]
3107 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3108 let (client_builder, server) = test_client_builder_with_server().await;
3109 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3110 set_client_session(&client).await;
3111
3112 Mock::given(method("POST"))
3113 .and(path("_matrix/client/r0/createRoom"))
3114 .and(header("authorization", "Bearer 1234"))
3115 .respond_with(
3116 ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3117 )
3118 .mount(&server)
3119 .await;
3120
3121 // Create a room in the internal store
3122 let room = client
3123 .create_room(assign!(CreateRoomRequest::new(), {
3124 invite: vec![],
3125 is_direct: false,
3126 }))
3127 .await
3128 .unwrap();
3129
3130 // Room is locally present, but not synced, the call will timeout
3131 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3132 .await
3133 .unwrap_err();
3134 }
3135
3136 #[async_test]
3137 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3138 let server = MatrixMockServer::new().await;
3139 let client = server.client_builder().build().await;
3140
3141 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3142
3143 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3144 assert_matches!(ret, Ok(true));
3145 }
3146
3147 #[async_test]
3148 async fn test_is_room_alias_available_if_alias_is_resolved() {
3149 let server = MatrixMockServer::new().await;
3150 let client = server.client_builder().build().await;
3151
3152 server
3153 .mock_room_directory_resolve_alias()
3154 .ok("!some_room_id:matrix.org", Vec::new())
3155 .expect(1)
3156 .mount()
3157 .await;
3158
3159 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3160 assert_matches!(ret, Ok(false));
3161 }
3162
3163 #[async_test]
3164 async fn test_is_room_alias_available_if_error_found() {
3165 let server = MatrixMockServer::new().await;
3166 let client = server.client_builder().build().await;
3167
3168 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3169
3170 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3171 assert_matches!(ret, Err(_));
3172 }
3173
3174 #[async_test]
3175 async fn test_create_room_alias() {
3176 let server = MatrixMockServer::new().await;
3177 let client = server.client_builder().build().await;
3178
3179 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3180
3181 let ret = client
3182 .create_room_alias(
3183 room_alias_id!("#some_alias:matrix.org"),
3184 room_id!("!some_room:matrix.org"),
3185 )
3186 .await;
3187 assert_matches!(ret, Ok(()));
3188 }
3189
3190 #[async_test]
3191 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3192 let server = MatrixMockServer::new().await;
3193 let client = server.client_builder().build().await;
3194
3195 let room_id = room_id!("!a-room:matrix.org");
3196
3197 // Make sure the summary endpoint is called once
3198 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3199
3200 // We create a locally cached invited room
3201 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3202
3203 // And we get a preview, the server endpoint was reached
3204 let preview = client
3205 .get_room_preview(room_id.into(), Vec::new())
3206 .await
3207 .expect("Room preview should be retrieved");
3208
3209 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3210 }
3211
3212 #[async_test]
3213 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3214 let server = MatrixMockServer::new().await;
3215 let client = server.client_builder().build().await;
3216
3217 let room_id = room_id!("!a-room:matrix.org");
3218
3219 // Make sure the summary endpoint is called once
3220 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3221
3222 // We create a locally cached left room
3223 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3224
3225 // And we get a preview, the server endpoint was reached
3226 let preview = client
3227 .get_room_preview(room_id.into(), Vec::new())
3228 .await
3229 .expect("Room preview should be retrieved");
3230
3231 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3232 }
3233
3234 #[async_test]
3235 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3236 let server = MatrixMockServer::new().await;
3237 let client = server.client_builder().build().await;
3238
3239 let room_id = room_id!("!a-room:matrix.org");
3240
3241 // Make sure the summary endpoint is called once
3242 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3243
3244 // We create a locally cached knocked room
3245 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3246
3247 // And we get a preview, the server endpoint was reached
3248 let preview = client
3249 .get_room_preview(room_id.into(), Vec::new())
3250 .await
3251 .expect("Room preview should be retrieved");
3252
3253 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3254 }
3255
3256 #[async_test]
3257 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3258 let server = MatrixMockServer::new().await;
3259 let client = server.client_builder().build().await;
3260
3261 let room_id = room_id!("!a-room:matrix.org");
3262
3263 // Make sure the summary endpoint is not called
3264 server.mock_room_summary().ok(room_id).never().mount().await;
3265
3266 // We create a locally cached joined room
3267 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3268
3269 // And we get a preview, no server endpoint was reached
3270 let preview = client
3271 .get_room_preview(room_id.into(), Vec::new())
3272 .await
3273 .expect("Room preview should be retrieved");
3274
3275 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3276 }
3277
3278 #[async_test]
3279 async fn test_room_preview_for_banned_room_retrieves_local_room_info() {
3280 let server = MatrixMockServer::new().await;
3281 let client = server.client_builder().build().await;
3282
3283 let room_id = room_id!("!a-room:matrix.org");
3284
3285 // Make sure the summary endpoint is not called
3286 server.mock_room_summary().ok(room_id).never().mount().await;
3287
3288 // We create a locally cached banned room
3289 let banned_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Banned);
3290
3291 // And we get a preview, no server endpoint was reached
3292 let preview = client
3293 .get_room_preview(room_id.into(), Vec::new())
3294 .await
3295 .expect("Room preview should be retrieved");
3296
3297 assert_eq!(banned_room.room_id().to_owned(), preview.room_id);
3298 }
3299}