matrix_sdk/client/mod.rs
1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18 collections::{btree_map, BTreeMap},
19 fmt::{self, Debug},
20 future::{ready, Future},
21 pin::Pin,
22 sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23};
24
25use caches::ClientCaches;
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::store::LockableCryptoStore;
32use matrix_sdk_base::{
33 event_cache::store::EventCacheStoreLock,
34 store::{DynStateStore, RoomLoadSettings, ServerCapabilities},
35 sync::{Notification, RoomUpdates},
36 BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
37 StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm,
38};
39use matrix_sdk_common::ttl_cache::TtlCache;
40#[cfg(feature = "e2e-encryption")]
41use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
42use ruma::{
43 api::{
44 client::{
45 account::whoami,
46 alias::{create_alias, delete_alias, get_alias},
47 device::{delete_devices, get_devices, update_device},
48 directory::{get_public_rooms, get_public_rooms_filtered},
49 discovery::{
50 get_capabilities::{self, Capabilities},
51 get_supported_versions,
52 },
53 error::ErrorKind,
54 filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
55 knock::knock_room,
56 membership::{join_room_by_id, join_room_by_id_or_alias},
57 room::create_room,
58 session::login::v3::DiscoveryInfo,
59 sync::sync_events,
60 uiaa,
61 user_directory::search_users,
62 },
63 error::FromHttpResponseError,
64 MatrixVersion, OutgoingRequest,
65 },
66 assign,
67 push::Ruleset,
68 time::Instant,
69 DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
70 RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
71};
72use serde::de::DeserializeOwned;
73use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
74use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
75use url::Url;
76
77use self::futures::SendRequest;
78use crate::{
79 authentication::{
80 matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
81 SaveSessionCallback,
82 },
83 config::RequestConfig,
84 deduplicating_handler::DeduplicatingHandler,
85 error::HttpResult,
86 event_cache::EventCache,
87 event_handler::{
88 EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
89 EventHandlerStore, ObservableEventHandler, SyncEvent,
90 },
91 http_client::HttpClient,
92 notification_settings::NotificationSettings,
93 room_preview::RoomPreview,
94 send_queue::SendQueueData,
95 sliding_sync::Version as SlidingSyncVersion,
96 sync::{RoomUpdate, SyncResponse},
97 Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
98 Room, SessionTokens, TransmissionProgress,
99};
100#[cfg(feature = "e2e-encryption")]
101use crate::{
102 encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
103 store_locks::CrossProcessStoreLock,
104};
105
106mod builder;
107pub(crate) mod caches;
108pub(crate) mod futures;
109
110pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
111
112#[cfg(not(target_arch = "wasm32"))]
113type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
114#[cfg(target_arch = "wasm32")]
115type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
116
117#[cfg(not(target_arch = "wasm32"))]
118type NotificationHandlerFn =
119 Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
120#[cfg(target_arch = "wasm32")]
121type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
122
123/// Enum controlling if a loop running callbacks should continue or abort.
124///
125/// This is mainly used in the [`sync_with_callback`] method, the return value
126/// of the provided callback controls if the sync loop should be exited.
127///
128/// [`sync_with_callback`]: #method.sync_with_callback
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub enum LoopCtrl {
131 /// Continue running the loop.
132 Continue,
133 /// Break out of the loop.
134 Break,
135}
136
137/// Represents changes that can occur to a `Client`s `Session`.
138#[derive(Debug, Clone, PartialEq)]
139pub enum SessionChange {
140 /// The session's token is no longer valid.
141 UnknownToken {
142 /// Whether or not the session was soft logged out
143 soft_logout: bool,
144 },
145 /// The session's tokens have been refreshed.
146 TokensRefreshed,
147}
148
149/// An async/await enabled Matrix client.
150///
151/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
152#[derive(Clone)]
153pub struct Client {
154 pub(crate) inner: Arc<ClientInner>,
155}
156
157#[derive(Default)]
158pub(crate) struct ClientLocks {
159 /// Lock ensuring that only a single room may be marked as a DM at once.
160 /// Look at the [`Account::mark_as_dm()`] method for a more detailed
161 /// explanation.
162 pub(crate) mark_as_dm_lock: Mutex<()>,
163
164 /// Lock ensuring that only a single secret store is getting opened at the
165 /// same time.
166 ///
167 /// This is important so we don't accidentally create multiple different new
168 /// default secret storage keys.
169 #[cfg(feature = "e2e-encryption")]
170 pub(crate) open_secret_store_lock: Mutex<()>,
171
172 /// Lock ensuring that we're only storing a single secret at a time.
173 ///
174 /// Take a look at the [`SecretStore::put_secret`] method for a more
175 /// detailed explanation.
176 ///
177 /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
178 #[cfg(feature = "e2e-encryption")]
179 pub(crate) store_secret_lock: Mutex<()>,
180
181 /// Lock ensuring that only one method at a time might modify our backup.
182 #[cfg(feature = "e2e-encryption")]
183 pub(crate) backup_modify_lock: Mutex<()>,
184
185 /// Lock ensuring that we're going to attempt to upload backups for a single
186 /// requester.
187 #[cfg(feature = "e2e-encryption")]
188 pub(crate) backup_upload_lock: Mutex<()>,
189
190 /// Handler making sure we only have one group session sharing request in
191 /// flight per room.
192 #[cfg(feature = "e2e-encryption")]
193 pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
194
195 /// Lock making sure we're only doing one key claim request at a time.
196 #[cfg(feature = "e2e-encryption")]
197 pub(crate) key_claim_lock: Mutex<()>,
198
199 /// Handler to ensure that only one members request is running at a time,
200 /// given a room.
201 pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
202
203 /// Handler to ensure that only one encryption state request is running at a
204 /// time, given a room.
205 pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
206
207 /// Deduplicating handler for sending read receipts. The string is an
208 /// internal implementation detail, see [`Self::send_single_receipt`].
209 pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
210
211 #[cfg(feature = "e2e-encryption")]
212 pub(crate) cross_process_crypto_store_lock:
213 OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
214
215 /// Latest "generation" of data known by the crypto store.
216 ///
217 /// This is a counter that only increments, set in the database (and can
218 /// wrap). It's incremented whenever some process acquires a lock for the
219 /// first time. *This assumes the crypto store lock is being held, to
220 /// avoid data races on writing to this value in the store*.
221 ///
222 /// The current process will maintain this value in local memory and in the
223 /// DB over time. Observing a different value than the one read in
224 /// memory, when reading from the store indicates that somebody else has
225 /// written into the database under our feet.
226 ///
227 /// TODO: this should live in the `OlmMachine`, since it's information
228 /// related to the lock. As of today (2023-07-28), we blow up the entire
229 /// olm machine when there's a generation mismatch. So storing the
230 /// generation in the olm machine would make the client think there's
231 /// *always* a mismatch, and that's why we need to store the generation
232 /// outside the `OlmMachine`.
233 #[cfg(feature = "e2e-encryption")]
234 pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
235}
236
237pub(crate) struct ClientInner {
238 /// All the data related to authentication and authorization.
239 pub(crate) auth_ctx: Arc<AuthCtx>,
240
241 /// The URL of the server.
242 ///
243 /// Not to be confused with the `Self::homeserver`. `server` is usually
244 /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
245 /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
246 /// homeserver (at the time of writing — 2024-08-28).
247 ///
248 /// This value is optional depending on how the `Client` has been built.
249 /// If it's been built from a homeserver URL directly, we don't know the
250 /// server. However, if the `Client` has been built from a server URL or
251 /// name, then the homeserver has been discovered, and we know both.
252 server: Option<Url>,
253
254 /// The URL of the homeserver to connect to.
255 ///
256 /// This is the URL for the client-server Matrix API.
257 homeserver: StdRwLock<Url>,
258
259 /// The sliding sync version.
260 sliding_sync_version: StdRwLock<SlidingSyncVersion>,
261
262 /// The underlying HTTP client.
263 pub(crate) http_client: HttpClient,
264
265 /// User session data.
266 pub(super) base_client: BaseClient,
267
268 /// Collection of in-memory caches for the [`Client`].
269 pub(crate) caches: ClientCaches,
270
271 /// Collection of locks individual client methods might want to use, either
272 /// to ensure that only a single call to a method happens at once or to
273 /// deduplicate multiple calls to a method.
274 pub(crate) locks: ClientLocks,
275
276 /// The cross-process store locks holder name.
277 ///
278 /// The SDK provides cross-process store locks (see
279 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
280 /// `holder_name` is the value used for all cross-process store locks
281 /// used by this `Client`.
282 ///
283 /// If multiple `Client`s are running in different processes, this
284 /// value MUST be different for each `Client`.
285 cross_process_store_locks_holder_name: String,
286
287 /// A mapping of the times at which the current user sent typing notices,
288 /// keyed by room.
289 pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
290
291 /// Event handlers. See `add_event_handler`.
292 pub(crate) event_handlers: EventHandlerStore,
293
294 /// Notification handlers. See `register_notification_handler`.
295 notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
296
297 /// The sender-side of channels used to receive room updates.
298 pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
299
300 /// The sender-side of a channel used to observe all the room updates of a
301 /// sync response.
302 pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
303
304 /// Whether the client should update its homeserver URL with the discovery
305 /// information present in the login response.
306 respect_login_well_known: bool,
307
308 /// An event that can be listened on to wait for a successful sync. The
309 /// event will only be fired if a sync loop is running. Can be used for
310 /// synchronization, e.g. if we send out a request to create a room, we can
311 /// wait for the sync to get the data to fetch a room object from the state
312 /// store.
313 pub(crate) sync_beat: event_listener::Event,
314
315 /// A central cache for events, inactive first.
316 ///
317 /// It becomes active when [`EventCache::subscribe`] is called.
318 pub(crate) event_cache: OnceCell<EventCache>,
319
320 /// End-to-end encryption related state.
321 #[cfg(feature = "e2e-encryption")]
322 pub(crate) e2ee: EncryptionData,
323
324 /// The verification state of our own device.
325 #[cfg(feature = "e2e-encryption")]
326 pub(crate) verification_state: SharedObservable<VerificationState>,
327
328 /// Data related to the [`SendQueue`].
329 ///
330 /// [`SendQueue`]: crate::send_queue::SendQueue
331 pub(crate) send_queue_data: Arc<SendQueueData>,
332}
333
334impl ClientInner {
335 /// Create a new `ClientInner`.
336 ///
337 /// All the fields passed as parameters here are those that must be cloned
338 /// upon instantiation of a sub-client, e.g. a client specialized for
339 /// notifications.
340 #[allow(clippy::too_many_arguments)]
341 async fn new(
342 auth_ctx: Arc<AuthCtx>,
343 server: Option<Url>,
344 homeserver: Url,
345 sliding_sync_version: SlidingSyncVersion,
346 http_client: HttpClient,
347 base_client: BaseClient,
348 server_capabilities: ClientServerCapabilities,
349 respect_login_well_known: bool,
350 event_cache: OnceCell<EventCache>,
351 send_queue: Arc<SendQueueData>,
352 #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
353 cross_process_store_locks_holder_name: String,
354 ) -> Arc<Self> {
355 let caches = ClientCaches {
356 server_capabilities: server_capabilities.into(),
357 server_metadata: Mutex::new(TtlCache::new()),
358 };
359
360 let client = Self {
361 server,
362 homeserver: StdRwLock::new(homeserver),
363 auth_ctx,
364 sliding_sync_version: StdRwLock::new(sliding_sync_version),
365 http_client,
366 base_client,
367 caches,
368 locks: Default::default(),
369 cross_process_store_locks_holder_name,
370 typing_notice_times: Default::default(),
371 event_handlers: Default::default(),
372 notification_handlers: Default::default(),
373 room_update_channels: Default::default(),
374 // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
375 // ballast for all observers to catch up.
376 room_updates_sender: broadcast::Sender::new(32),
377 respect_login_well_known,
378 sync_beat: event_listener::Event::new(),
379 event_cache,
380 send_queue_data: send_queue,
381 #[cfg(feature = "e2e-encryption")]
382 e2ee: EncryptionData::new(encryption_settings),
383 #[cfg(feature = "e2e-encryption")]
384 verification_state: SharedObservable::new(VerificationState::Unknown),
385 };
386
387 #[allow(clippy::let_and_return)]
388 let client = Arc::new(client);
389
390 #[cfg(feature = "e2e-encryption")]
391 client.e2ee.initialize_room_key_tasks(&client);
392
393 let _ = client
394 .event_cache
395 .get_or_init(|| async { EventCache::new(WeakClient::from_inner(&client)) })
396 .await;
397
398 client
399 }
400}
401
402#[cfg(not(tarpaulin_include))]
403impl Debug for Client {
404 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
405 write!(fmt, "Client")
406 }
407}
408
409impl Client {
410 /// Create a new [`Client`] that will use the given homeserver.
411 ///
412 /// # Arguments
413 ///
414 /// * `homeserver_url` - The homeserver that the client should connect to.
415 pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
416 Self::builder().homeserver_url(homeserver_url).build().await
417 }
418
419 /// Returns a subscriber that publishes an event every time the ignore user
420 /// list changes.
421 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
422 self.inner.base_client.subscribe_to_ignore_user_list_changes()
423 }
424
425 /// Create a new [`ClientBuilder`].
426 pub fn builder() -> ClientBuilder {
427 ClientBuilder::new()
428 }
429
430 pub(crate) fn base_client(&self) -> &BaseClient {
431 &self.inner.base_client
432 }
433
434 /// The underlying HTTP client.
435 pub fn http_client(&self) -> &reqwest::Client {
436 &self.inner.http_client.inner
437 }
438
439 pub(crate) fn locks(&self) -> &ClientLocks {
440 &self.inner.locks
441 }
442
443 pub(crate) fn auth_ctx(&self) -> &AuthCtx {
444 &self.inner.auth_ctx
445 }
446
447 /// The cross-process store locks holder name.
448 ///
449 /// The SDK provides cross-process store locks (see
450 /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
451 /// `holder_name` is the value used for all cross-process store locks
452 /// used by this `Client`.
453 pub fn cross_process_store_locks_holder_name(&self) -> &str {
454 &self.inner.cross_process_store_locks_holder_name
455 }
456
457 /// Change the homeserver URL used by this client.
458 ///
459 /// # Arguments
460 ///
461 /// * `homeserver_url` - The new URL to use.
462 fn set_homeserver(&self, homeserver_url: Url) {
463 *self.inner.homeserver.write().unwrap() = homeserver_url;
464 }
465
466 /// Get the capabilities of the homeserver.
467 ///
468 /// This method should be used to check what features are supported by the
469 /// homeserver.
470 ///
471 /// # Examples
472 ///
473 /// ```no_run
474 /// # use matrix_sdk::Client;
475 /// # use url::Url;
476 /// # async {
477 /// # let homeserver = Url::parse("http://example.com")?;
478 /// let client = Client::new(homeserver).await?;
479 ///
480 /// let capabilities = client.get_capabilities().await?;
481 ///
482 /// if capabilities.change_password.enabled {
483 /// // Change password
484 /// }
485 /// # anyhow::Ok(()) };
486 /// ```
487 pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
488 let res = self.send(get_capabilities::v3::Request::new()).await?;
489 Ok(res.capabilities)
490 }
491
492 /// Get a copy of the default request config.
493 ///
494 /// The default request config is what's used when sending requests if no
495 /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
496 /// function with such a parameter.
497 ///
498 /// If the default request config was not customized through
499 /// [`ClientBuilder`] when creating this `Client`, the returned value will
500 /// be equivalent to [`RequestConfig::default()`].
501 pub fn request_config(&self) -> RequestConfig {
502 self.inner.http_client.request_config
503 }
504
505 /// Check whether the client has been activated.
506 ///
507 /// A client is considered active when:
508 ///
509 /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
510 /// is logged in,
511 /// 2. Has loaded cached data from storage,
512 /// 3. If encryption is enabled, it also initialized or restored its
513 /// `OlmMachine`.
514 pub fn is_active(&self) -> bool {
515 self.inner.base_client.is_active()
516 }
517
518 /// The server used by the client.
519 ///
520 /// See `Self::server` to learn more.
521 pub fn server(&self) -> Option<&Url> {
522 self.inner.server.as_ref()
523 }
524
525 /// The homeserver of the client.
526 pub fn homeserver(&self) -> Url {
527 self.inner.homeserver.read().unwrap().clone()
528 }
529
530 /// Get the sliding sync version.
531 pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
532 self.inner.sliding_sync_version.read().unwrap().clone()
533 }
534
535 /// Override the sliding sync version.
536 pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
537 let mut lock = self.inner.sliding_sync_version.write().unwrap();
538 *lock = version;
539 }
540
541 /// Get the Matrix user session meta information.
542 ///
543 /// If the client is currently logged in, this will return a
544 /// [`SessionMeta`] object which contains the user ID and device ID.
545 /// Otherwise it returns `None`.
546 pub fn session_meta(&self) -> Option<&SessionMeta> {
547 self.base_client().session_meta()
548 }
549
550 /// Returns a receiver that gets events for each room info update. To watch
551 /// for new events, use `receiver.resubscribe()`. Each event contains the
552 /// room and a boolean whether this event should trigger a room list update.
553 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
554 self.base_client().room_info_notable_update_receiver()
555 }
556
557 /// Performs a search for users.
558 /// The search is performed case-insensitively on user IDs and display names
559 ///
560 /// # Arguments
561 ///
562 /// * `search_term` - The search term for the search
563 /// * `limit` - The maximum number of results to return. Defaults to 10.
564 ///
565 /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
566 pub async fn search_users(
567 &self,
568 search_term: &str,
569 limit: u64,
570 ) -> HttpResult<search_users::v3::Response> {
571 let mut request = search_users::v3::Request::new(search_term.to_owned());
572
573 if let Some(limit) = UInt::new(limit) {
574 request.limit = limit;
575 }
576
577 self.send(request).await
578 }
579
580 /// Get the user id of the current owner of the client.
581 pub fn user_id(&self) -> Option<&UserId> {
582 self.session_meta().map(|s| s.user_id.as_ref())
583 }
584
585 /// Get the device ID that identifies the current session.
586 pub fn device_id(&self) -> Option<&DeviceId> {
587 self.session_meta().map(|s| s.device_id.as_ref())
588 }
589
590 /// Get the current access token for this session.
591 ///
592 /// Will be `None` if the client has not been logged in.
593 pub fn access_token(&self) -> Option<String> {
594 self.auth_ctx().access_token()
595 }
596
597 /// Get the current tokens for this session.
598 ///
599 /// To be notified of changes in the session tokens, use
600 /// [`Client::subscribe_to_session_changes()`] or
601 /// [`Client::set_session_callbacks()`].
602 ///
603 /// Returns `None` if the client has not been logged in.
604 pub fn session_tokens(&self) -> Option<SessionTokens> {
605 self.auth_ctx().session_tokens()
606 }
607
608 /// Access the authentication API used to log in this client.
609 ///
610 /// Will be `None` if the client has not been logged in.
611 pub fn auth_api(&self) -> Option<AuthApi> {
612 match self.auth_ctx().auth_data.get()? {
613 AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
614 AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
615 }
616 }
617
618 /// Get the whole session info of this client.
619 ///
620 /// Will be `None` if the client has not been logged in.
621 ///
622 /// Can be used with [`Client::restore_session`] to restore a previously
623 /// logged-in session.
624 pub fn session(&self) -> Option<AuthSession> {
625 match self.auth_api()? {
626 AuthApi::Matrix(api) => api.session().map(Into::into),
627 AuthApi::OAuth(api) => api.full_session().map(Into::into),
628 }
629 }
630
631 /// Get a reference to the state store.
632 pub fn state_store(&self) -> &DynStateStore {
633 self.base_client().state_store()
634 }
635
636 /// Get a reference to the event cache store.
637 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
638 self.base_client().event_cache_store()
639 }
640
641 /// Access the native Matrix authentication API with this client.
642 pub fn matrix_auth(&self) -> MatrixAuth {
643 MatrixAuth::new(self.clone())
644 }
645
646 /// Get the account of the current owner of the client.
647 pub fn account(&self) -> Account {
648 Account::new(self.clone())
649 }
650
651 /// Get the encryption manager of the client.
652 #[cfg(feature = "e2e-encryption")]
653 pub fn encryption(&self) -> Encryption {
654 Encryption::new(self.clone())
655 }
656
657 /// Get the media manager of the client.
658 pub fn media(&self) -> Media {
659 Media::new(self.clone())
660 }
661
662 /// Get the pusher manager of the client.
663 pub fn pusher(&self) -> Pusher {
664 Pusher::new(self.clone())
665 }
666
667 /// Access the OAuth 2.0 API of the client.
668 pub fn oauth(&self) -> OAuth {
669 OAuth::new(self.clone())
670 }
671
672 /// Register a handler for a specific event type.
673 ///
674 /// The handler is a function or closure with one or more arguments. The
675 /// first argument is the event itself. All additional arguments are
676 /// "context" arguments: They have to implement [`EventHandlerContext`].
677 /// This trait is named that way because most of the types implementing it
678 /// give additional context about an event: The room it was in, its raw form
679 /// and other similar things. As two exceptions to this,
680 /// [`Client`] and [`EventHandlerHandle`] also implement the
681 /// `EventHandlerContext` trait so you don't have to clone your client
682 /// into the event handler manually and a handler can decide to remove
683 /// itself.
684 ///
685 /// Some context arguments are not universally applicable. A context
686 /// argument that isn't available for the given event type will result in
687 /// the event handler being skipped and an error being logged. The following
688 /// context argument types are only available for a subset of event types:
689 ///
690 /// * [`Room`] is only available for room-specific events, i.e. not for
691 /// events like global account data events or presence events.
692 ///
693 /// You can provide custom context via
694 /// [`add_event_handler_context`](Client::add_event_handler_context) and
695 /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
696 /// into the event handler.
697 ///
698 /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
699 ///
700 /// # Examples
701 ///
702 /// ```no_run
703 /// use matrix_sdk::{
704 /// deserialized_responses::EncryptionInfo,
705 /// event_handler::Ctx,
706 /// ruma::{
707 /// events::{
708 /// macros::EventContent,
709 /// push_rules::PushRulesEvent,
710 /// room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
711 /// },
712 /// push::Action,
713 /// Int, MilliSecondsSinceUnixEpoch,
714 /// },
715 /// Client, Room,
716 /// };
717 /// use serde::{Deserialize, Serialize};
718 ///
719 /// # async fn example(client: Client) {
720 /// client.add_event_handler(
721 /// |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
722 /// // Common usage: Room event plus room and client.
723 /// },
724 /// );
725 /// client.add_event_handler(
726 /// |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
727 /// async move {
728 /// // An `Option<EncryptionInfo>` parameter lets you distinguish between
729 /// // unencrypted events and events that were decrypted by the SDK.
730 /// }
731 /// },
732 /// );
733 /// client.add_event_handler(
734 /// |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
735 /// async move {
736 /// // A `Vec<Action>` parameter allows you to know which push actions
737 /// // are applicable for an event. For example, an event with
738 /// // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
739 /// // in the timeline.
740 /// }
741 /// },
742 /// );
743 /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
744 /// // You can omit any or all arguments after the first.
745 /// });
746 ///
747 /// // Registering a temporary event handler:
748 /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
749 /// /* Event handler */
750 /// });
751 /// client.remove_event_handler(handle);
752 ///
753 /// // Registering custom event handler context:
754 /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
755 /// struct MyContext {
756 /// number: usize,
757 /// }
758 /// client.add_event_handler_context(MyContext { number: 5 });
759 /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
760 /// // Use the context
761 /// });
762 ///
763 /// // Custom events work exactly the same way, you just need to declare
764 /// // the content struct and use the EventContent derive macro on it.
765 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
766 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
767 /// struct TokenEventContent {
768 /// token: String,
769 /// #[serde(rename = "exp")]
770 /// expires_at: MilliSecondsSinceUnixEpoch,
771 /// }
772 ///
773 /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
774 /// todo!("Display the token");
775 /// });
776 ///
777 /// // Event handler closures can also capture local variables.
778 /// // Make sure they are cheap to clone though, because they will be cloned
779 /// // every time the closure is called.
780 /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
781 ///
782 /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
783 /// println!("Calling the handler with identifier {data}");
784 /// });
785 /// # }
786 /// ```
787 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
788 where
789 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
790 H: EventHandler<Ev, Ctx>,
791 {
792 self.add_event_handler_impl(handler, None)
793 }
794
795 /// Register a handler for a specific room, and event type.
796 ///
797 /// This method works the same way as
798 /// [`add_event_handler`][Self::add_event_handler], except that the handler
799 /// will only be called for events in the room with the specified ID. See
800 /// that method for more details on event handler functions.
801 ///
802 /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
803 /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
804 /// your use case.
805 pub fn add_room_event_handler<Ev, Ctx, H>(
806 &self,
807 room_id: &RoomId,
808 handler: H,
809 ) -> EventHandlerHandle
810 where
811 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
812 H: EventHandler<Ev, Ctx>,
813 {
814 self.add_event_handler_impl(handler, Some(room_id.to_owned()))
815 }
816
817 /// Observe a specific event type.
818 ///
819 /// `Ev` represents the kind of event that will be observed. `Ctx`
820 /// represents the context that will come with the event. It relies on the
821 /// same mechanism as [`Client::add_event_handler`]. The main difference is
822 /// that it returns an [`ObservableEventHandler`] and doesn't require a
823 /// user-defined closure. It is possible to subscribe to the
824 /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
825 /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
826 /// Ctx)`.
827 ///
828 /// Be careful that only the most recent value can be observed. Subscribers
829 /// are notified when a new value is sent, but there is no guarantee
830 /// that they will see all values.
831 ///
832 /// # Example
833 ///
834 /// Let's see a classical usage:
835 ///
836 /// ```
837 /// use futures_util::StreamExt as _;
838 /// use matrix_sdk::{
839 /// ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
840 /// Client, Room,
841 /// };
842 ///
843 /// # async fn example(client: Client) -> Option<()> {
844 /// let observer =
845 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
846 ///
847 /// let mut subscriber = observer.subscribe();
848 ///
849 /// let (event, (room, push_actions)) = subscriber.next().await?;
850 /// # Some(())
851 /// # }
852 /// ```
853 ///
854 /// Now let's see how to get several contexts that can be useful for you:
855 ///
856 /// ```
857 /// use matrix_sdk::{
858 /// deserialized_responses::EncryptionInfo,
859 /// ruma::{
860 /// events::room::{
861 /// message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
862 /// },
863 /// push::Action,
864 /// },
865 /// Client, Room,
866 /// };
867 ///
868 /// # async fn example(client: Client) {
869 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
870 /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
871 ///
872 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
873 /// // to distinguish between unencrypted events and events that were decrypted
874 /// // by the SDK.
875 /// let _ = client
876 /// .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
877 /// );
878 ///
879 /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
880 /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
881 /// // should be highlighted in the timeline.
882 /// let _ =
883 /// client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
884 ///
885 /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
886 /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
887 /// # }
888 /// ```
889 ///
890 /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
891 pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
892 where
893 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
894 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
895 {
896 self.observe_room_events_impl(None)
897 }
898
899 /// Observe a specific room, and event type.
900 ///
901 /// This method works the same way as [`Client::observe_events`], except
902 /// that the observability will only be applied for events in the room with
903 /// the specified ID. See that method for more details.
904 ///
905 /// Be careful that only the most recent value can be observed. Subscribers
906 /// are notified when a new value is sent, but there is no guarantee
907 /// that they will see all values.
908 pub fn observe_room_events<Ev, Ctx>(
909 &self,
910 room_id: &RoomId,
911 ) -> ObservableEventHandler<(Ev, Ctx)>
912 where
913 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
914 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
915 {
916 self.observe_room_events_impl(Some(room_id.to_owned()))
917 }
918
919 /// Shared implementation for `Client::observe_events` and
920 /// `Client::observe_room_events`.
921 fn observe_room_events_impl<Ev, Ctx>(
922 &self,
923 room_id: Option<OwnedRoomId>,
924 ) -> ObservableEventHandler<(Ev, Ctx)>
925 where
926 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
927 Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
928 {
929 // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
930 // new value.
931 let shared_observable = SharedObservable::new(None);
932
933 ObservableEventHandler::new(
934 shared_observable.clone(),
935 self.event_handler_drop_guard(self.add_event_handler_impl(
936 move |event: Ev, context: Ctx| {
937 shared_observable.set(Some((event, context)));
938
939 ready(())
940 },
941 room_id,
942 )),
943 )
944 }
945
946 /// Remove the event handler associated with the handle.
947 ///
948 /// Note that you **must not** call `remove_event_handler` from the
949 /// non-async part of an event handler, that is:
950 ///
951 /// ```ignore
952 /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
953 /// // âš this will cause a deadlock âš
954 /// client.remove_event_handler(handle);
955 ///
956 /// async move {
957 /// // removing the event handler here is fine
958 /// client.remove_event_handler(handle);
959 /// }
960 /// })
961 /// ```
962 ///
963 /// Note also that handlers that remove themselves will still execute with
964 /// events received in the same sync cycle.
965 ///
966 /// # Arguments
967 ///
968 /// `handle` - The [`EventHandlerHandle`] that is returned when
969 /// registering the event handler with [`Client::add_event_handler`].
970 ///
971 /// # Examples
972 ///
973 /// ```no_run
974 /// # use url::Url;
975 /// # use tokio::sync::mpsc;
976 /// #
977 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
978 /// #
979 /// use matrix_sdk::{
980 /// event_handler::EventHandlerHandle,
981 /// ruma::events::room::member::SyncRoomMemberEvent, Client,
982 /// };
983 /// #
984 /// # futures_executor::block_on(async {
985 /// # let client = matrix_sdk::Client::builder()
986 /// # .homeserver_url(homeserver)
987 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
988 /// # .build()
989 /// # .await
990 /// # .unwrap();
991 ///
992 /// client.add_event_handler(
993 /// |ev: SyncRoomMemberEvent,
994 /// client: Client,
995 /// handle: EventHandlerHandle| async move {
996 /// // Common usage: Check arriving Event is the expected one
997 /// println!("Expected RoomMemberEvent received!");
998 /// client.remove_event_handler(handle);
999 /// },
1000 /// );
1001 /// # });
1002 /// ```
1003 pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1004 self.inner.event_handlers.remove(handle);
1005 }
1006
1007 /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1008 /// the given handle.
1009 ///
1010 /// When the returned value is dropped, the event handler will be removed.
1011 pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1012 EventHandlerDropGuard::new(handle, self.clone())
1013 }
1014
1015 /// Add an arbitrary value for use as event handler context.
1016 ///
1017 /// The value can be obtained in an event handler by adding an argument of
1018 /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1019 ///
1020 /// If a value of the same type has been added before, it will be
1021 /// overwritten.
1022 ///
1023 /// # Examples
1024 ///
1025 /// ```no_run
1026 /// use matrix_sdk::{
1027 /// event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1028 /// Room,
1029 /// };
1030 /// # #[derive(Clone)]
1031 /// # struct SomeType;
1032 /// # fn obtain_gui_handle() -> SomeType { SomeType }
1033 /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1034 /// # futures_executor::block_on(async {
1035 /// # let client = matrix_sdk::Client::builder()
1036 /// # .homeserver_url(homeserver)
1037 /// # .server_versions([ruma::api::MatrixVersion::V1_0])
1038 /// # .build()
1039 /// # .await
1040 /// # .unwrap();
1041 ///
1042 /// // Handle used to send messages to the UI part of the app
1043 /// let my_gui_handle: SomeType = obtain_gui_handle();
1044 ///
1045 /// client.add_event_handler_context(my_gui_handle.clone());
1046 /// client.add_event_handler(
1047 /// |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1048 /// async move {
1049 /// // gui_handle.send(DisplayMessage { message: ev });
1050 /// }
1051 /// },
1052 /// );
1053 /// # });
1054 /// ```
1055 pub fn add_event_handler_context<T>(&self, ctx: T)
1056 where
1057 T: Clone + Send + Sync + 'static,
1058 {
1059 self.inner.event_handlers.add_context(ctx);
1060 }
1061
1062 /// Register a handler for a notification.
1063 ///
1064 /// Similar to [`Client::add_event_handler`], but only allows functions
1065 /// or closures with exactly the three arguments [`Notification`], [`Room`],
1066 /// [`Client`] for now.
1067 pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1068 where
1069 H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1070 Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1071 {
1072 self.inner.notification_handlers.write().await.push(Box::new(
1073 move |notification, room, client| Box::pin((handler)(notification, room, client)),
1074 ));
1075
1076 self
1077 }
1078
1079 /// Subscribe to all updates for the room with the given ID.
1080 ///
1081 /// The returned receiver will receive a new message for each sync response
1082 /// that contains updates for that room.
1083 pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1084 match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1085 btree_map::Entry::Vacant(entry) => {
1086 let (tx, rx) = broadcast::channel(8);
1087 entry.insert(tx);
1088 rx
1089 }
1090 btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1091 }
1092 }
1093
1094 /// Subscribe to all updates to all rooms, whenever any has been received in
1095 /// a sync response.
1096 pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1097 self.inner.room_updates_sender.subscribe()
1098 }
1099
1100 pub(crate) async fn notification_handlers(
1101 &self,
1102 ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1103 self.inner.notification_handlers.read().await
1104 }
1105
1106 /// Get all the rooms the client knows about.
1107 ///
1108 /// This will return the list of joined, invited, and left rooms.
1109 pub fn rooms(&self) -> Vec<Room> {
1110 self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1111 }
1112
1113 /// Get all the rooms the client knows about, filtered by room state.
1114 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1115 self.base_client()
1116 .rooms_filtered(filter)
1117 .into_iter()
1118 .map(|room| Room::new(self.clone(), room))
1119 .collect()
1120 }
1121
1122 /// Get a stream of all the rooms, in addition to the existing rooms.
1123 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1124 let (rooms, stream) = self.base_client().rooms_stream();
1125
1126 let map_room = |room| Room::new(self.clone(), room);
1127
1128 (
1129 rooms.into_iter().map(map_room).collect(),
1130 stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1131 )
1132 }
1133
1134 /// Returns the joined rooms this client knows about.
1135 pub fn joined_rooms(&self) -> Vec<Room> {
1136 self.base_client()
1137 .rooms_filtered(RoomStateFilter::JOINED)
1138 .into_iter()
1139 .map(|room| Room::new(self.clone(), room))
1140 .collect()
1141 }
1142
1143 /// Returns the invited rooms this client knows about.
1144 pub fn invited_rooms(&self) -> Vec<Room> {
1145 self.base_client()
1146 .rooms_filtered(RoomStateFilter::INVITED)
1147 .into_iter()
1148 .map(|room| Room::new(self.clone(), room))
1149 .collect()
1150 }
1151
1152 /// Returns the left rooms this client knows about.
1153 pub fn left_rooms(&self) -> Vec<Room> {
1154 self.base_client()
1155 .rooms_filtered(RoomStateFilter::LEFT)
1156 .into_iter()
1157 .map(|room| Room::new(self.clone(), room))
1158 .collect()
1159 }
1160
1161 /// Get a room with the given room id.
1162 ///
1163 /// # Arguments
1164 ///
1165 /// `room_id` - The unique id of the room that should be fetched.
1166 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1167 self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1168 }
1169
1170 /// Gets the preview of a room, whether the current user has joined it or
1171 /// not.
1172 pub async fn get_room_preview(
1173 &self,
1174 room_or_alias_id: &RoomOrAliasId,
1175 via: Vec<OwnedServerName>,
1176 ) -> Result<RoomPreview> {
1177 let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1178 Ok(room_id) => room_id.to_owned(),
1179 Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1180 };
1181
1182 if let Some(room) = self.get_room(&room_id) {
1183 // The cached data can only be trusted if the room state is joined or
1184 // banned: for invite and knock rooms, no updates will be received
1185 // for the rooms after the invite/knock action took place so we may
1186 // have very out to date data for important fields such as
1187 // `join_rule`. For left rooms, the homeserver should return the latest info.
1188 match room.state() {
1189 RoomState::Joined | RoomState::Banned => {
1190 return Ok(RoomPreview::from_known_room(&room).await);
1191 }
1192 RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1193 }
1194 }
1195
1196 RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1197 }
1198
1199 /// Resolve a room alias to a room id and a list of servers which know
1200 /// about it.
1201 ///
1202 /// # Arguments
1203 ///
1204 /// `room_alias` - The room alias to be resolved.
1205 pub async fn resolve_room_alias(
1206 &self,
1207 room_alias: &RoomAliasId,
1208 ) -> HttpResult<get_alias::v3::Response> {
1209 let request = get_alias::v3::Request::new(room_alias.to_owned());
1210 self.send(request).await
1211 }
1212
1213 /// Checks if a room alias is not in use yet.
1214 ///
1215 /// Returns:
1216 /// - `Ok(true)` if the room alias is available.
1217 /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1218 /// status code).
1219 /// - An `Err` otherwise.
1220 pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1221 match self.resolve_room_alias(alias).await {
1222 // The room alias was resolved, so it's already in use.
1223 Ok(_) => Ok(false),
1224 Err(error) => {
1225 match error.client_api_error_kind() {
1226 // The room alias wasn't found, so it's available.
1227 Some(ErrorKind::NotFound) => Ok(true),
1228 _ => Err(error),
1229 }
1230 }
1231 }
1232 }
1233
1234 /// Adds a new room alias associated with a room to the room directory.
1235 pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1236 let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1237 self.send(request).await?;
1238 Ok(())
1239 }
1240
1241 /// Removes a room alias from the room directory.
1242 pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1243 let request = delete_alias::v3::Request::new(alias.to_owned());
1244 self.send(request).await?;
1245 Ok(())
1246 }
1247
1248 /// Update the homeserver from the login response well-known if needed.
1249 ///
1250 /// # Arguments
1251 ///
1252 /// * `login_well_known` - The `well_known` field from a successful login
1253 /// response.
1254 pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1255 if self.inner.respect_login_well_known {
1256 if let Some(well_known) = login_well_known {
1257 if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1258 self.set_homeserver(homeserver);
1259 }
1260 }
1261 }
1262 }
1263
1264 /// Similar to [`Client::restore_session_with`], with
1265 /// [`RoomLoadSettings::default()`].
1266 ///
1267 /// # Panics
1268 ///
1269 /// Panics if a session was already restored or logged in.
1270 #[instrument(skip_all)]
1271 pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1272 self.restore_session_with(session, RoomLoadSettings::default()).await
1273 }
1274
1275 /// Restore a session previously logged-in using one of the available
1276 /// authentication APIs. The number of rooms to restore is controlled by
1277 /// [`RoomLoadSettings`].
1278 ///
1279 /// See the documentation of the corresponding authentication API's
1280 /// `restore_session` method for more information.
1281 ///
1282 /// # Panics
1283 ///
1284 /// Panics if a session was already restored or logged in.
1285 #[instrument(skip_all)]
1286 pub async fn restore_session_with(
1287 &self,
1288 session: impl Into<AuthSession>,
1289 room_load_settings: RoomLoadSettings,
1290 ) -> Result<()> {
1291 let session = session.into();
1292 match session {
1293 AuthSession::Matrix(session) => {
1294 Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1295 }
1296 AuthSession::OAuth(session) => {
1297 Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1298 }
1299 }
1300 }
1301
1302 /// Refresh the access token using the authentication API used to log into
1303 /// this session.
1304 ///
1305 /// See the documentation of the authentication API's `refresh_access_token`
1306 /// method for more information.
1307 pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1308 let Some(auth_api) = self.auth_api() else {
1309 return Err(RefreshTokenError::RefreshTokenRequired);
1310 };
1311
1312 match auth_api {
1313 AuthApi::Matrix(api) => {
1314 trace!("Token refresh: Using the homeserver.");
1315 Box::pin(api.refresh_access_token()).await?;
1316 }
1317 AuthApi::OAuth(api) => {
1318 trace!("Token refresh: Using OAuth 2.0.");
1319 Box::pin(api.refresh_access_token()).await?;
1320 }
1321 }
1322
1323 Ok(())
1324 }
1325
1326 /// Log out the current session using the proper authentication API.
1327 ///
1328 /// # Errors
1329 ///
1330 /// Returns an error if the session is not authenticated or if an error
1331 /// occurred while making the request to the server.
1332 pub async fn logout(&self) -> Result<(), Error> {
1333 let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1334 match auth_api {
1335 AuthApi::Matrix(matrix_auth) => {
1336 matrix_auth.logout().await?;
1337 Ok(())
1338 }
1339 AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1340 }
1341 }
1342
1343 /// Get or upload a sync filter.
1344 ///
1345 /// This method will either get a filter ID from the store or upload the
1346 /// filter definition to the homeserver and return the new filter ID.
1347 ///
1348 /// # Arguments
1349 ///
1350 /// * `filter_name` - The unique name of the filter, this name will be used
1351 /// locally to store and identify the filter ID returned by the server.
1352 ///
1353 /// * `definition` - The filter definition that should be uploaded to the
1354 /// server if no filter ID can be found in the store.
1355 ///
1356 /// # Examples
1357 ///
1358 /// ```no_run
1359 /// # use matrix_sdk::{
1360 /// # Client, config::SyncSettings,
1361 /// # ruma::api::client::{
1362 /// # filter::{
1363 /// # FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1364 /// # },
1365 /// # sync::sync_events::v3::Filter,
1366 /// # }
1367 /// # };
1368 /// # use url::Url;
1369 /// # async {
1370 /// # let homeserver = Url::parse("http://example.com").unwrap();
1371 /// # let client = Client::new(homeserver).await.unwrap();
1372 /// let mut filter = FilterDefinition::default();
1373 ///
1374 /// // Let's enable member lazy loading.
1375 /// filter.room.state.lazy_load_options =
1376 /// LazyLoadOptions::Enabled { include_redundant_members: false };
1377 ///
1378 /// let filter_id = client
1379 /// .get_or_upload_filter("sync", filter)
1380 /// .await
1381 /// .unwrap();
1382 ///
1383 /// let sync_settings = SyncSettings::new()
1384 /// .filter(Filter::FilterId(filter_id));
1385 ///
1386 /// let response = client.sync_once(sync_settings).await.unwrap();
1387 /// # };
1388 #[instrument(skip(self, definition))]
1389 pub async fn get_or_upload_filter(
1390 &self,
1391 filter_name: &str,
1392 definition: FilterDefinition,
1393 ) -> Result<String> {
1394 if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1395 debug!("Found filter locally");
1396 Ok(filter)
1397 } else {
1398 debug!("Didn't find filter locally");
1399 let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1400 let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1401 let response = self.send(request).await?;
1402
1403 self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1404
1405 Ok(response.filter_id)
1406 }
1407 }
1408
1409 /// Join a room by `RoomId`.
1410 ///
1411 /// Returns a `join_room_by_id::Response` consisting of the
1412 /// joined rooms `RoomId`.
1413 ///
1414 /// # Arguments
1415 ///
1416 /// * `room_id` - The `RoomId` of the room to be joined.
1417 pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1418 let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1419 let response = self.send(request).await?;
1420 let base_room = self.base_client().room_joined(&response.room_id).await?;
1421 Ok(Room::new(self.clone(), base_room))
1422 }
1423
1424 /// Join a room by `RoomId`.
1425 ///
1426 /// Returns a `join_room_by_id_or_alias::Response` consisting of the
1427 /// joined rooms `RoomId`.
1428 ///
1429 /// # Arguments
1430 ///
1431 /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1432 /// alias looks like `#name:example.com`.
1433 pub async fn join_room_by_id_or_alias(
1434 &self,
1435 alias: &RoomOrAliasId,
1436 server_names: &[OwnedServerName],
1437 ) -> Result<Room> {
1438 let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1439 via: server_names.to_owned(),
1440 });
1441 let response = self.send(request).await?;
1442 let base_room = self.base_client().room_joined(&response.room_id).await?;
1443 Ok(Room::new(self.clone(), base_room))
1444 }
1445
1446 /// Search the homeserver's directory of public rooms.
1447 ///
1448 /// Sends a request to "_matrix/client/r0/publicRooms", returns
1449 /// a `get_public_rooms::Response`.
1450 ///
1451 /// # Arguments
1452 ///
1453 /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1454 ///
1455 /// * `since` - Pagination token from a previous request.
1456 ///
1457 /// * `server` - The name of the server, if `None` the requested server is
1458 /// used.
1459 ///
1460 /// # Examples
1461 /// ```no_run
1462 /// use matrix_sdk::Client;
1463 /// # use url::Url;
1464 /// # let homeserver = Url::parse("http://example.com").unwrap();
1465 /// # let limit = Some(10);
1466 /// # let since = Some("since token");
1467 /// # let server = Some("servername.com".try_into().unwrap());
1468 /// # async {
1469 /// let mut client = Client::new(homeserver).await.unwrap();
1470 ///
1471 /// client.public_rooms(limit, since, server).await;
1472 /// # };
1473 /// ```
1474 #[cfg_attr(not(target_arch = "wasm32"), deny(clippy::future_not_send))]
1475 pub async fn public_rooms(
1476 &self,
1477 limit: Option<u32>,
1478 since: Option<&str>,
1479 server: Option<&ServerName>,
1480 ) -> HttpResult<get_public_rooms::v3::Response> {
1481 let limit = limit.map(UInt::from);
1482
1483 let request = assign!(get_public_rooms::v3::Request::new(), {
1484 limit,
1485 since: since.map(ToOwned::to_owned),
1486 server: server.map(ToOwned::to_owned),
1487 });
1488 self.send(request).await
1489 }
1490
1491 /// Create a room with the given parameters.
1492 ///
1493 /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1494 /// created room.
1495 ///
1496 /// If you want to create a direct message with one specific user, you can
1497 /// use [`create_dm`][Self::create_dm], which is more convenient than
1498 /// assembling the [`create_room::v3::Request`] yourself.
1499 ///
1500 /// If the `is_direct` field of the request is set to `true` and at least
1501 /// one user is invited, the room will be automatically added to the direct
1502 /// rooms in the account data.
1503 ///
1504 /// # Examples
1505 ///
1506 /// ```no_run
1507 /// use matrix_sdk::{
1508 /// ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1509 /// Client,
1510 /// };
1511 /// # use url::Url;
1512 /// #
1513 /// # async {
1514 /// # let homeserver = Url::parse("http://example.com").unwrap();
1515 /// let request = CreateRoomRequest::new();
1516 /// let client = Client::new(homeserver).await.unwrap();
1517 /// assert!(client.create_room(request).await.is_ok());
1518 /// # };
1519 /// ```
1520 pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1521 let invite = request.invite.clone();
1522 let is_direct_room = request.is_direct;
1523 let response = self.send(request).await?;
1524
1525 let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1526
1527 let joined_room = Room::new(self.clone(), base_room);
1528
1529 if is_direct_room && !invite.is_empty() {
1530 if let Err(error) =
1531 self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1532 {
1533 // FIXME: Retry in the background
1534 error!("Failed to mark room as DM: {error}");
1535 }
1536 }
1537
1538 Ok(joined_room)
1539 }
1540
1541 /// Create a DM room.
1542 ///
1543 /// Convenience shorthand for [`create_room`][Self::create_room] with the
1544 /// given user being invited, the room marked `is_direct` and both the
1545 /// creator and invitee getting the default maximum power level.
1546 ///
1547 /// If the `e2e-encryption` feature is enabled, the room will also be
1548 /// encrypted.
1549 ///
1550 /// # Arguments
1551 ///
1552 /// * `user_id` - The ID of the user to create a DM for.
1553 pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1554 #[cfg(feature = "e2e-encryption")]
1555 let initial_state =
1556 vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1557 .to_raw_any()];
1558
1559 #[cfg(not(feature = "e2e-encryption"))]
1560 let initial_state = vec![];
1561
1562 let request = assign!(create_room::v3::Request::new(), {
1563 invite: vec![user_id.to_owned()],
1564 is_direct: true,
1565 preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1566 initial_state,
1567 });
1568
1569 self.create_room(request).await
1570 }
1571
1572 /// Search the homeserver's directory for public rooms with a filter.
1573 ///
1574 /// # Arguments
1575 ///
1576 /// * `room_search` - The easiest way to create this request is using the
1577 /// `get_public_rooms_filtered::Request` itself.
1578 ///
1579 /// # Examples
1580 ///
1581 /// ```no_run
1582 /// # use url::Url;
1583 /// # use matrix_sdk::Client;
1584 /// # async {
1585 /// # let homeserver = Url::parse("http://example.com")?;
1586 /// use matrix_sdk::ruma::{
1587 /// api::client::directory::get_public_rooms_filtered, directory::Filter,
1588 /// };
1589 /// # let mut client = Client::new(homeserver).await?;
1590 ///
1591 /// let mut filter = Filter::new();
1592 /// filter.generic_search_term = Some("rust".to_owned());
1593 /// let mut request = get_public_rooms_filtered::v3::Request::new();
1594 /// request.filter = filter;
1595 ///
1596 /// let response = client.public_rooms_filtered(request).await?;
1597 ///
1598 /// for room in response.chunk {
1599 /// println!("Found room {room:?}");
1600 /// }
1601 /// # anyhow::Ok(()) };
1602 /// ```
1603 pub async fn public_rooms_filtered(
1604 &self,
1605 request: get_public_rooms_filtered::v3::Request,
1606 ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1607 self.send(request).await
1608 }
1609
1610 /// Send an arbitrary request to the server, without updating client state.
1611 ///
1612 /// **Warning:** Because this method *does not* update the client state, it
1613 /// is important to make sure that you account for this yourself, and
1614 /// use wrapper methods where available. This method should *only* be
1615 /// used if a wrapper method for the endpoint you'd like to use is not
1616 /// available.
1617 ///
1618 /// # Arguments
1619 ///
1620 /// * `request` - A filled out and valid request for the endpoint to be hit
1621 ///
1622 /// * `timeout` - An optional request timeout setting, this overrides the
1623 /// default request setting if one was set.
1624 ///
1625 /// # Examples
1626 ///
1627 /// ```no_run
1628 /// # use matrix_sdk::{Client, config::SyncSettings};
1629 /// # use url::Url;
1630 /// # async {
1631 /// # let homeserver = Url::parse("http://localhost:8080")?;
1632 /// # let mut client = Client::new(homeserver).await?;
1633 /// use matrix_sdk::ruma::{api::client::profile, user_id};
1634 ///
1635 /// // First construct the request you want to make
1636 /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1637 /// // for all available Endpoints
1638 /// let user_id = user_id!("@example:localhost").to_owned();
1639 /// let request = profile::get_profile::v3::Request::new(user_id);
1640 ///
1641 /// // Start the request using Client::send()
1642 /// let response = client.send(request).await?;
1643 ///
1644 /// // Check the corresponding Response struct to find out what types are
1645 /// // returned
1646 /// # anyhow::Ok(()) };
1647 /// ```
1648 pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1649 where
1650 Request: OutgoingRequest + Clone + Debug,
1651 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1652 {
1653 SendRequest {
1654 client: self.clone(),
1655 request,
1656 config: None,
1657 send_progress: Default::default(),
1658 }
1659 }
1660
1661 pub(crate) async fn send_inner<Request>(
1662 &self,
1663 request: Request,
1664 config: Option<RequestConfig>,
1665 send_progress: SharedObservable<TransmissionProgress>,
1666 ) -> HttpResult<Request::IncomingResponse>
1667 where
1668 Request: OutgoingRequest + Debug,
1669 HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1670 {
1671 let homeserver = self.homeserver().to_string();
1672 let access_token = self.access_token();
1673
1674 self.inner
1675 .http_client
1676 .send(
1677 request,
1678 config,
1679 homeserver,
1680 access_token.as_deref(),
1681 &self.server_versions().await?,
1682 send_progress,
1683 )
1684 .await
1685 }
1686
1687 fn broadcast_unknown_token(&self, soft_logout: &bool) {
1688 _ = self
1689 .inner
1690 .auth_ctx
1691 .session_change_sender
1692 .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1693 }
1694
1695 /// Fetches server capabilities from network; no caching.
1696 pub async fn fetch_server_capabilities(
1697 &self,
1698 request_config: Option<RequestConfig>,
1699 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1700 let resp = self
1701 .inner
1702 .http_client
1703 .send(
1704 get_supported_versions::Request::new(),
1705 request_config,
1706 self.homeserver().to_string(),
1707 None,
1708 &[MatrixVersion::V1_0],
1709 Default::default(),
1710 )
1711 .await?;
1712
1713 // Fill both unstable features and server versions at once.
1714 let mut versions = resp.known_versions().collect::<Vec<_>>();
1715 if versions.is_empty() {
1716 versions.push(MatrixVersion::V1_0);
1717 }
1718
1719 Ok((versions.into(), resp.unstable_features))
1720 }
1721
1722 /// Load server capabilities from storage, or fetch them from network and
1723 /// cache them.
1724 async fn load_or_fetch_server_capabilities(
1725 &self,
1726 ) -> HttpResult<(Box<[MatrixVersion]>, BTreeMap<String, bool>)> {
1727 match self.state_store().get_kv_data(StateStoreDataKey::ServerCapabilities).await {
1728 Ok(Some(stored)) => {
1729 if let Some((versions, unstable_features)) =
1730 stored.into_server_capabilities().and_then(|cap| cap.maybe_decode())
1731 {
1732 return Ok((versions.into(), unstable_features));
1733 }
1734 }
1735 Ok(None) => {
1736 // fallthrough: cache is empty
1737 }
1738 Err(err) => {
1739 warn!("error when loading cached server capabilities: {err}");
1740 // fallthrough to network.
1741 }
1742 }
1743
1744 let (versions, unstable_features) = self.fetch_server_capabilities(None).await?;
1745
1746 // Attempt to cache the result in storage.
1747 {
1748 let encoded = ServerCapabilities::new(&versions, unstable_features.clone());
1749 if let Err(err) = self
1750 .state_store()
1751 .set_kv_data(
1752 StateStoreDataKey::ServerCapabilities,
1753 StateStoreDataValue::ServerCapabilities(encoded),
1754 )
1755 .await
1756 {
1757 warn!("error when caching server capabilities: {err}");
1758 }
1759 }
1760
1761 Ok((versions, unstable_features))
1762 }
1763
1764 async fn get_or_load_and_cache_server_capabilities<
1765 T,
1766 F: Fn(&ClientServerCapabilities) -> Option<T>,
1767 >(
1768 &self,
1769 f: F,
1770 ) -> HttpResult<T> {
1771 let caps = &self.inner.caches.server_capabilities;
1772 if let Some(val) = f(&*caps.read().await) {
1773 return Ok(val);
1774 }
1775
1776 let mut guard = caps.write().await;
1777 if let Some(val) = f(&guard) {
1778 return Ok(val);
1779 }
1780
1781 let (versions, unstable_features) = self.load_or_fetch_server_capabilities().await?;
1782
1783 guard.server_versions = Some(versions);
1784 guard.unstable_features = Some(unstable_features);
1785
1786 // SAFETY: both fields were set above, so the function will always return some.
1787 Ok(f(&guard).unwrap())
1788 }
1789
1790 /// Get the Matrix versions supported by the homeserver by fetching them
1791 /// from the server or the cache.
1792 ///
1793 /// # Examples
1794 ///
1795 /// ```no_run
1796 /// use ruma::api::MatrixVersion;
1797 /// # use matrix_sdk::{Client, config::SyncSettings};
1798 /// # use url::Url;
1799 /// # async {
1800 /// # let homeserver = Url::parse("http://localhost:8080")?;
1801 /// # let mut client = Client::new(homeserver).await?;
1802 ///
1803 /// let server_versions = client.server_versions().await?;
1804 /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
1805 /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
1806 /// # anyhow::Ok(()) };
1807 /// ```
1808 pub async fn server_versions(&self) -> HttpResult<Box<[MatrixVersion]>> {
1809 self.get_or_load_and_cache_server_capabilities(|caps| caps.server_versions.clone()).await
1810 }
1811
1812 /// Get the unstable features supported by the homeserver by fetching them
1813 /// from the server or the cache.
1814 ///
1815 /// # Examples
1816 ///
1817 /// ```no_run
1818 /// # use matrix_sdk::{Client, config::SyncSettings};
1819 /// # use url::Url;
1820 /// # async {
1821 /// # let homeserver = Url::parse("http://localhost:8080")?;
1822 /// # let mut client = Client::new(homeserver).await?;
1823 /// let unstable_features = client.unstable_features().await?;
1824 /// let supports_msc_x =
1825 /// unstable_features.get("msc_x").copied().unwrap_or(false);
1826 /// println!("The homeserver supports msc X: {supports_msc_x:?}");
1827 /// # anyhow::Ok(()) };
1828 /// ```
1829 pub async fn unstable_features(&self) -> HttpResult<BTreeMap<String, bool>> {
1830 self.get_or_load_and_cache_server_capabilities(|caps| caps.unstable_features.clone()).await
1831 }
1832
1833 /// Empty the server version and unstable features cache.
1834 ///
1835 /// Since the SDK caches server capabilities (versions and unstable
1836 /// features), it's possible to have a stale entry in the cache. This
1837 /// functions makes it possible to force reset it.
1838 pub async fn reset_server_capabilities(&self) -> Result<()> {
1839 // Empty the in-memory caches.
1840 let mut guard = self.inner.caches.server_capabilities.write().await;
1841 guard.server_versions = None;
1842 guard.unstable_features = None;
1843
1844 // Empty the store cache.
1845 Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerCapabilities).await?)
1846 }
1847
1848 /// Check whether MSC 4028 is enabled on the homeserver.
1849 ///
1850 /// # Examples
1851 ///
1852 /// ```no_run
1853 /// # use matrix_sdk::{Client, config::SyncSettings};
1854 /// # use url::Url;
1855 /// # async {
1856 /// # let homeserver = Url::parse("http://localhost:8080")?;
1857 /// # let mut client = Client::new(homeserver).await?;
1858 /// let msc4028_enabled =
1859 /// client.can_homeserver_push_encrypted_event_to_device().await?;
1860 /// # anyhow::Ok(()) };
1861 /// ```
1862 pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
1863 Ok(self.unstable_features().await?.get("org.matrix.msc4028").copied().unwrap_or(false))
1864 }
1865
1866 /// Get information of all our own devices.
1867 ///
1868 /// # Examples
1869 ///
1870 /// ```no_run
1871 /// # use matrix_sdk::{Client, config::SyncSettings};
1872 /// # use url::Url;
1873 /// # async {
1874 /// # let homeserver = Url::parse("http://localhost:8080")?;
1875 /// # let mut client = Client::new(homeserver).await?;
1876 /// let response = client.devices().await?;
1877 ///
1878 /// for device in response.devices {
1879 /// println!(
1880 /// "Device: {} {}",
1881 /// device.device_id,
1882 /// device.display_name.as_deref().unwrap_or("")
1883 /// );
1884 /// }
1885 /// # anyhow::Ok(()) };
1886 /// ```
1887 pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
1888 let request = get_devices::v3::Request::new();
1889
1890 self.send(request).await
1891 }
1892
1893 /// Delete the given devices from the server.
1894 ///
1895 /// # Arguments
1896 ///
1897 /// * `devices` - The list of devices that should be deleted from the
1898 /// server.
1899 ///
1900 /// * `auth_data` - This request requires user interactive auth, the first
1901 /// request needs to set this to `None` and will always fail with an
1902 /// `UiaaResponse`. The response will contain information for the
1903 /// interactive auth and the same request needs to be made but this time
1904 /// with some `auth_data` provided.
1905 ///
1906 /// ```no_run
1907 /// # use matrix_sdk::{
1908 /// # ruma::{api::client::uiaa, device_id},
1909 /// # Client, Error, config::SyncSettings,
1910 /// # };
1911 /// # use serde_json::json;
1912 /// # use url::Url;
1913 /// # use std::collections::BTreeMap;
1914 /// # async {
1915 /// # let homeserver = Url::parse("http://localhost:8080")?;
1916 /// # let mut client = Client::new(homeserver).await?;
1917 /// let devices = &[device_id!("DEVICEID").to_owned()];
1918 ///
1919 /// if let Err(e) = client.delete_devices(devices, None).await {
1920 /// if let Some(info) = e.as_uiaa_response() {
1921 /// let mut password = uiaa::Password::new(
1922 /// uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1923 /// "wordpass".to_owned(),
1924 /// );
1925 /// password.session = info.session.clone();
1926 ///
1927 /// client
1928 /// .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
1929 /// .await?;
1930 /// }
1931 /// }
1932 /// # anyhow::Ok(()) };
1933 pub async fn delete_devices(
1934 &self,
1935 devices: &[OwnedDeviceId],
1936 auth_data: Option<uiaa::AuthData>,
1937 ) -> HttpResult<delete_devices::v3::Response> {
1938 let mut request = delete_devices::v3::Request::new(devices.to_owned());
1939 request.auth = auth_data;
1940
1941 self.send(request).await
1942 }
1943
1944 /// Change the display name of a device owned by the current user.
1945 ///
1946 /// Returns a `update_device::Response` which specifies the result
1947 /// of the operation.
1948 ///
1949 /// # Arguments
1950 ///
1951 /// * `device_id` - The ID of the device to change the display name of.
1952 /// * `display_name` - The new display name to set.
1953 pub async fn rename_device(
1954 &self,
1955 device_id: &DeviceId,
1956 display_name: &str,
1957 ) -> HttpResult<update_device::v3::Response> {
1958 let mut request = update_device::v3::Request::new(device_id.to_owned());
1959 request.display_name = Some(display_name.to_owned());
1960
1961 self.send(request).await
1962 }
1963
1964 /// Synchronize the client's state with the latest state on the server.
1965 ///
1966 /// ## Syncing Events
1967 ///
1968 /// Messages or any other type of event need to be periodically fetched from
1969 /// the server, this is achieved by sending a `/sync` request to the server.
1970 ///
1971 /// The first sync is sent out without a [`token`]. The response of the
1972 /// first sync will contain a [`next_batch`] field which should then be
1973 /// used in the subsequent sync calls as the [`token`]. This ensures that we
1974 /// don't receive the same events multiple times.
1975 ///
1976 /// ## Long Polling
1977 ///
1978 /// A sync should in the usual case always be in flight. The
1979 /// [`SyncSettings`] have a [`timeout`] option, which controls how
1980 /// long the server will wait for new events before it will respond.
1981 /// The server will respond immediately if some new events arrive before the
1982 /// timeout has expired. If no changes arrive and the timeout expires an
1983 /// empty sync response will be sent to the client.
1984 ///
1985 /// This method of sending a request that may not receive a response
1986 /// immediately is called long polling.
1987 ///
1988 /// ## Filtering Events
1989 ///
1990 /// The number or type of messages and events that the client should receive
1991 /// from the server can be altered using a [`Filter`].
1992 ///
1993 /// Filters can be non-trivial and, since they will be sent with every sync
1994 /// request, they may take up a bunch of unnecessary bandwidth.
1995 ///
1996 /// Luckily filters can be uploaded to the server and reused using an unique
1997 /// identifier, this can be achieved using the [`get_or_upload_filter()`]
1998 /// method.
1999 ///
2000 /// # Arguments
2001 ///
2002 /// * `sync_settings` - Settings for the sync call, this allows us to set
2003 /// various options to configure the sync:
2004 /// * [`filter`] - To configure which events we receive and which get
2005 /// [filtered] by the server
2006 /// * [`timeout`] - To configure our [long polling] setup.
2007 /// * [`token`] - To tell the server which events we already received
2008 /// and where we wish to continue syncing.
2009 /// * [`full_state`] - To tell the server that we wish to receive all
2010 /// state events, regardless of our configured [`token`].
2011 /// * [`set_presence`] - To tell the server to set the presence and to
2012 /// which state.
2013 ///
2014 /// # Examples
2015 ///
2016 /// ```no_run
2017 /// # use url::Url;
2018 /// # async {
2019 /// # let homeserver = Url::parse("http://localhost:8080")?;
2020 /// # let username = "";
2021 /// # let password = "";
2022 /// use matrix_sdk::{
2023 /// config::SyncSettings,
2024 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2025 /// };
2026 ///
2027 /// let client = Client::new(homeserver).await?;
2028 /// client.matrix_auth().login_username(username, password).send().await?;
2029 ///
2030 /// // Sync once so we receive the client state and old messages.
2031 /// client.sync_once(SyncSettings::default()).await?;
2032 ///
2033 /// // Register our handler so we start responding once we receive a new
2034 /// // event.
2035 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2036 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2037 /// });
2038 ///
2039 /// // Now keep on syncing forever. `sync()` will use the stored sync token
2040 /// // from our `sync_once()` call automatically.
2041 /// client.sync(SyncSettings::default()).await;
2042 /// # anyhow::Ok(()) };
2043 /// ```
2044 ///
2045 /// [`sync`]: #method.sync
2046 /// [`SyncSettings`]: crate::config::SyncSettings
2047 /// [`token`]: crate::config::SyncSettings#method.token
2048 /// [`timeout`]: crate::config::SyncSettings#method.timeout
2049 /// [`full_state`]: crate::config::SyncSettings#method.full_state
2050 /// [`set_presence`]: ruma::presence::PresenceState
2051 /// [`filter`]: crate::config::SyncSettings#method.filter
2052 /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2053 /// [`next_batch`]: SyncResponse#structfield.next_batch
2054 /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2055 /// [long polling]: #long-polling
2056 /// [filtered]: #filtering-events
2057 #[instrument(skip(self))]
2058 pub async fn sync_once(
2059 &self,
2060 sync_settings: crate::config::SyncSettings,
2061 ) -> Result<SyncResponse> {
2062 // The sync might not return for quite a while due to the timeout.
2063 // We'll see if there's anything crypto related to send out before we
2064 // sync, i.e. if we closed our client after a sync but before the
2065 // crypto requests were sent out.
2066 //
2067 // This will mostly be a no-op.
2068 #[cfg(feature = "e2e-encryption")]
2069 if let Err(e) = self.send_outgoing_requests().await {
2070 error!(error = ?e, "Error while sending outgoing E2EE requests");
2071 }
2072
2073 let request = assign!(sync_events::v3::Request::new(), {
2074 filter: sync_settings.filter.map(|f| *f),
2075 since: sync_settings.token,
2076 full_state: sync_settings.full_state,
2077 set_presence: sync_settings.set_presence,
2078 timeout: sync_settings.timeout,
2079 });
2080 let mut request_config = self.request_config();
2081 if let Some(timeout) = sync_settings.timeout {
2082 request_config.timeout += timeout;
2083 }
2084
2085 let response = self.send(request).with_request_config(request_config).await?;
2086 let next_batch = response.next_batch.clone();
2087 let response = self.process_sync(response).await?;
2088
2089 #[cfg(feature = "e2e-encryption")]
2090 if let Err(e) = self.send_outgoing_requests().await {
2091 error!(error = ?e, "Error while sending outgoing E2EE requests");
2092 }
2093
2094 self.inner.sync_beat.notify(usize::MAX);
2095
2096 Ok(SyncResponse::new(next_batch, response))
2097 }
2098
2099 /// Repeatedly synchronize the client state with the server.
2100 ///
2101 /// This method will only return on error, if cancellation is needed
2102 /// the method should be wrapped in a cancelable task or the
2103 /// [`Client::sync_with_callback`] method can be used or
2104 /// [`Client::sync_with_result_callback`] if you want to handle error
2105 /// cases in the loop, too.
2106 ///
2107 /// This method will internally call [`Client::sync_once`] in a loop.
2108 ///
2109 /// This method can be used with the [`Client::add_event_handler`]
2110 /// method to react to individual events. If you instead wish to handle
2111 /// events in a bulk manner the [`Client::sync_with_callback`],
2112 /// [`Client::sync_with_result_callback`] and
2113 /// [`Client::sync_stream`] methods can be used instead. Those methods
2114 /// repeatedly return the whole sync response.
2115 ///
2116 /// # Arguments
2117 ///
2118 /// * `sync_settings` - Settings for the sync call. *Note* that those
2119 /// settings will be only used for the first sync call. See the argument
2120 /// docs for [`Client::sync_once`] for more info.
2121 ///
2122 /// # Return
2123 /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2124 /// up to the user of the API to check the error and decide whether the sync
2125 /// should continue or not.
2126 ///
2127 /// # Examples
2128 ///
2129 /// ```no_run
2130 /// # use url::Url;
2131 /// # async {
2132 /// # let homeserver = Url::parse("http://localhost:8080")?;
2133 /// # let username = "";
2134 /// # let password = "";
2135 /// use matrix_sdk::{
2136 /// config::SyncSettings,
2137 /// ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2138 /// };
2139 ///
2140 /// let client = Client::new(homeserver).await?;
2141 /// client.matrix_auth().login_username(&username, &password).send().await?;
2142 ///
2143 /// // Register our handler so we start responding once we receive a new
2144 /// // event.
2145 /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2146 /// println!("Received event {}: {:?}", ev.sender, ev.content);
2147 /// });
2148 ///
2149 /// // Now keep on syncing forever. `sync()` will use the latest sync token
2150 /// // automatically.
2151 /// client.sync(SyncSettings::default()).await?;
2152 /// # anyhow::Ok(()) };
2153 /// ```
2154 ///
2155 /// [argument docs]: #method.sync_once
2156 /// [`sync_with_callback`]: #method.sync_with_callback
2157 pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2158 self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2159 }
2160
2161 /// Repeatedly call sync to synchronize the client state with the server.
2162 ///
2163 /// # Arguments
2164 ///
2165 /// * `sync_settings` - Settings for the sync call. *Note* that those
2166 /// settings will be only used for the first sync call. See the argument
2167 /// docs for [`Client::sync_once`] for more info.
2168 ///
2169 /// * `callback` - A callback that will be called every time a successful
2170 /// response has been fetched from the server. The callback must return a
2171 /// boolean which signalizes if the method should stop syncing. If the
2172 /// callback returns `LoopCtrl::Continue` the sync will continue, if the
2173 /// callback returns `LoopCtrl::Break` the sync will be stopped.
2174 ///
2175 /// # Return
2176 /// The sync runs until an error occurs or the
2177 /// callback indicates that the Loop should stop. If the callback asked for
2178 /// a regular stop, the result will be `Ok(())` otherwise the
2179 /// `Err(Error)` is returned.
2180 ///
2181 /// # Examples
2182 ///
2183 /// The following example demonstrates how to sync forever while sending all
2184 /// the interesting events through a mpsc channel to another thread e.g. a
2185 /// UI thread.
2186 ///
2187 /// ```no_run
2188 /// # use std::time::Duration;
2189 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2190 /// # use url::Url;
2191 /// # async {
2192 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2193 /// # let mut client = Client::new(homeserver).await.unwrap();
2194 ///
2195 /// use tokio::sync::mpsc::channel;
2196 ///
2197 /// let (tx, rx) = channel(100);
2198 ///
2199 /// let sync_channel = &tx;
2200 /// let sync_settings = SyncSettings::new()
2201 /// .timeout(Duration::from_secs(30));
2202 ///
2203 /// client
2204 /// .sync_with_callback(sync_settings, |response| async move {
2205 /// let channel = sync_channel;
2206 /// for (room_id, room) in response.rooms.join {
2207 /// for event in room.timeline.events {
2208 /// channel.send(event).await.unwrap();
2209 /// }
2210 /// }
2211 ///
2212 /// LoopCtrl::Continue
2213 /// })
2214 /// .await;
2215 /// };
2216 /// ```
2217 #[instrument(skip_all)]
2218 pub async fn sync_with_callback<C>(
2219 &self,
2220 sync_settings: crate::config::SyncSettings,
2221 callback: impl Fn(SyncResponse) -> C,
2222 ) -> Result<(), Error>
2223 where
2224 C: Future<Output = LoopCtrl>,
2225 {
2226 self.sync_with_result_callback(sync_settings, |result| async {
2227 Ok(callback(result?).await)
2228 })
2229 .await
2230 }
2231
2232 /// Repeatedly call sync to synchronize the client state with the server.
2233 ///
2234 /// # Arguments
2235 ///
2236 /// * `sync_settings` - Settings for the sync call. *Note* that those
2237 /// settings will be only used for the first sync call. See the argument
2238 /// docs for [`Client::sync_once`] for more info.
2239 ///
2240 /// * `callback` - A callback that will be called every time after a
2241 /// response has been received, failure or not. The callback returns a
2242 /// `Result<LoopCtrl, Error>`, too. When returning
2243 /// `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2244 /// returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2245 /// function returns `Ok(())`. In case the callback can't handle the
2246 /// `Error` or has a different malfunction, it can return an `Err(Error)`,
2247 /// which results in the sync ending and the `Err(Error)` being returned.
2248 ///
2249 /// # Return
2250 /// The sync runs until an error occurs that the callback can't handle or
2251 /// the callback indicates that the Loop should stop. If the callback
2252 /// asked for a regular stop, the result will be `Ok(())` otherwise the
2253 /// `Err(Error)` is returned.
2254 ///
2255 /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2256 /// this, and are handled first without sending the result to the
2257 /// callback. Only after they have exceeded is the `Result` handed to
2258 /// the callback.
2259 ///
2260 /// # Examples
2261 ///
2262 /// The following example demonstrates how to sync forever while sending all
2263 /// the interesting events through a mpsc channel to another thread e.g. a
2264 /// UI thread.
2265 ///
2266 /// ```no_run
2267 /// # use std::time::Duration;
2268 /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2269 /// # use url::Url;
2270 /// # async {
2271 /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2272 /// # let mut client = Client::new(homeserver).await.unwrap();
2273 /// #
2274 /// use tokio::sync::mpsc::channel;
2275 ///
2276 /// let (tx, rx) = channel(100);
2277 ///
2278 /// let sync_channel = &tx;
2279 /// let sync_settings = SyncSettings::new()
2280 /// .timeout(Duration::from_secs(30));
2281 ///
2282 /// client
2283 /// .sync_with_result_callback(sync_settings, |response| async move {
2284 /// let channel = sync_channel;
2285 /// let sync_response = response?;
2286 /// for (room_id, room) in sync_response.rooms.join {
2287 /// for event in room.timeline.events {
2288 /// channel.send(event).await.unwrap();
2289 /// }
2290 /// }
2291 ///
2292 /// Ok(LoopCtrl::Continue)
2293 /// })
2294 /// .await;
2295 /// };
2296 /// ```
2297 #[instrument(skip(self, callback))]
2298 pub async fn sync_with_result_callback<C>(
2299 &self,
2300 mut sync_settings: crate::config::SyncSettings,
2301 callback: impl Fn(Result<SyncResponse, Error>) -> C,
2302 ) -> Result<(), Error>
2303 where
2304 C: Future<Output = Result<LoopCtrl, Error>>,
2305 {
2306 let mut last_sync_time: Option<Instant> = None;
2307
2308 if sync_settings.token.is_none() {
2309 sync_settings.token = self.sync_token().await;
2310 }
2311
2312 loop {
2313 trace!("Syncing");
2314 let result = self.sync_loop_helper(&mut sync_settings).await;
2315
2316 trace!("Running callback");
2317 if callback(result).await? == LoopCtrl::Break {
2318 trace!("Callback told us to stop");
2319 break;
2320 }
2321 trace!("Done running callback");
2322
2323 Client::delay_sync(&mut last_sync_time).await
2324 }
2325
2326 Ok(())
2327 }
2328
2329 //// Repeatedly synchronize the client state with the server.
2330 ///
2331 /// This method will internally call [`Client::sync_once`] in a loop and is
2332 /// equivalent to the [`Client::sync`] method but the responses are provided
2333 /// as an async stream.
2334 ///
2335 /// # Arguments
2336 ///
2337 /// * `sync_settings` - Settings for the sync call. *Note* that those
2338 /// settings will be only used for the first sync call. See the argument
2339 /// docs for [`Client::sync_once`] for more info.
2340 ///
2341 /// # Examples
2342 ///
2343 /// ```no_run
2344 /// # use url::Url;
2345 /// # async {
2346 /// # let homeserver = Url::parse("http://localhost:8080")?;
2347 /// # let username = "";
2348 /// # let password = "";
2349 /// use futures_util::StreamExt;
2350 /// use matrix_sdk::{config::SyncSettings, Client};
2351 ///
2352 /// let client = Client::new(homeserver).await?;
2353 /// client.matrix_auth().login_username(&username, &password).send().await?;
2354 ///
2355 /// let mut sync_stream =
2356 /// Box::pin(client.sync_stream(SyncSettings::default()).await);
2357 ///
2358 /// while let Some(Ok(response)) = sync_stream.next().await {
2359 /// for room in response.rooms.join.values() {
2360 /// for e in &room.timeline.events {
2361 /// if let Ok(event) = e.raw().deserialize() {
2362 /// println!("Received event {:?}", event);
2363 /// }
2364 /// }
2365 /// }
2366 /// }
2367 ///
2368 /// # anyhow::Ok(()) };
2369 /// ```
2370 #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2371 #[instrument(skip(self))]
2372 pub async fn sync_stream(
2373 &self,
2374 mut sync_settings: crate::config::SyncSettings,
2375 ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2376 let mut last_sync_time: Option<Instant> = None;
2377
2378 if sync_settings.token.is_none() {
2379 sync_settings.token = self.sync_token().await;
2380 }
2381
2382 let parent_span = Span::current();
2383
2384 async_stream::stream! {
2385 loop {
2386 yield self.sync_loop_helper(&mut sync_settings).instrument(parent_span.clone()).await;
2387
2388 Client::delay_sync(&mut last_sync_time).await
2389 }
2390 }
2391 }
2392
2393 /// Get the current, if any, sync token of the client.
2394 /// This will be None if the client didn't sync at least once.
2395 pub(crate) async fn sync_token(&self) -> Option<String> {
2396 self.inner.base_client.sync_token().await
2397 }
2398
2399 /// Gets information about the owner of a given access token.
2400 pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2401 let request = whoami::v3::Request::new();
2402 self.send(request).await
2403 }
2404
2405 /// Subscribes a new receiver to client SessionChange broadcasts.
2406 pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2407 let broadcast = &self.auth_ctx().session_change_sender;
2408 broadcast.subscribe()
2409 }
2410
2411 /// Sets the save/restore session callbacks.
2412 ///
2413 /// This is another mechanism to get synchronous updates to session tokens,
2414 /// while [`Self::subscribe_to_session_changes`] provides an async update.
2415 pub fn set_session_callbacks(
2416 &self,
2417 reload_session_callback: Box<ReloadSessionCallback>,
2418 save_session_callback: Box<SaveSessionCallback>,
2419 ) -> Result<()> {
2420 self.inner
2421 .auth_ctx
2422 .reload_session_callback
2423 .set(reload_session_callback)
2424 .map_err(|_| Error::MultipleSessionCallbacks)?;
2425
2426 self.inner
2427 .auth_ctx
2428 .save_session_callback
2429 .set(save_session_callback)
2430 .map_err(|_| Error::MultipleSessionCallbacks)?;
2431
2432 Ok(())
2433 }
2434
2435 /// Get the notification settings of the current owner of the client.
2436 pub async fn notification_settings(&self) -> NotificationSettings {
2437 let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2438 NotificationSettings::new(self.clone(), ruleset)
2439 }
2440
2441 /// Create a new specialized `Client` that can process notifications.
2442 ///
2443 /// See [`CrossProcessStoreLock::new`] to learn more about
2444 /// `cross_process_store_locks_holder_name`.
2445 ///
2446 /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2447 pub async fn notification_client(
2448 &self,
2449 cross_process_store_locks_holder_name: String,
2450 ) -> Result<Client> {
2451 let client = Client {
2452 inner: ClientInner::new(
2453 self.inner.auth_ctx.clone(),
2454 self.server().cloned(),
2455 self.homeserver(),
2456 self.sliding_sync_version(),
2457 self.inner.http_client.clone(),
2458 self.inner
2459 .base_client
2460 .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2461 .await?,
2462 self.inner.caches.server_capabilities.read().await.clone(),
2463 self.inner.respect_login_well_known,
2464 self.inner.event_cache.clone(),
2465 self.inner.send_queue_data.clone(),
2466 #[cfg(feature = "e2e-encryption")]
2467 self.inner.e2ee.encryption_settings,
2468 cross_process_store_locks_holder_name,
2469 )
2470 .await,
2471 };
2472
2473 Ok(client)
2474 }
2475
2476 /// The [`EventCache`] instance for this [`Client`].
2477 pub fn event_cache(&self) -> &EventCache {
2478 // SAFETY: always initialized in the `Client` ctor.
2479 self.inner.event_cache.get().unwrap()
2480 }
2481
2482 /// Waits until an at least partially synced room is received, and returns
2483 /// it.
2484 ///
2485 /// **Note: this function will loop endlessly until either it finds the room
2486 /// or an externally set timeout happens.**
2487 pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2488 loop {
2489 if let Some(room) = self.get_room(room_id) {
2490 if room.is_state_partially_or_fully_synced() {
2491 debug!("Found just created room!");
2492 return room;
2493 }
2494 debug!("Room wasn't partially synced, waiting for sync beat to try again");
2495 } else {
2496 debug!("Room wasn't found, waiting for sync beat to try again");
2497 }
2498 self.inner.sync_beat.listen().await;
2499 }
2500 }
2501
2502 /// Knock on a room given its `room_id_or_alias` to ask for permission to
2503 /// join it.
2504 pub async fn knock(
2505 &self,
2506 room_id_or_alias: OwnedRoomOrAliasId,
2507 reason: Option<String>,
2508 server_names: Vec<OwnedServerName>,
2509 ) -> Result<Room> {
2510 let request =
2511 assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2512 let response = self.send(request).await?;
2513 let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2514 Ok(Room::new(self.clone(), base_room))
2515 }
2516}
2517
2518/// A weak reference to the inner client, useful when trying to get a handle
2519/// on the owning client.
2520#[derive(Clone)]
2521pub(crate) struct WeakClient {
2522 client: Weak<ClientInner>,
2523}
2524
2525impl WeakClient {
2526 /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2527 pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2528 Self { client: Arc::downgrade(client) }
2529 }
2530
2531 /// Construct a [`WeakClient`] from a [`Client`].
2532 pub fn from_client(client: &Client) -> Self {
2533 Self::from_inner(&client.inner)
2534 }
2535
2536 /// Attempts to get a [`Client`] from this [`WeakClient`].
2537 pub fn get(&self) -> Option<Client> {
2538 self.client.upgrade().map(|inner| Client { inner })
2539 }
2540
2541 /// Gets the number of strong (`Arc`) pointers still pointing to this
2542 /// client.
2543 #[allow(dead_code)]
2544 pub fn strong_count(&self) -> usize {
2545 self.client.strong_count()
2546 }
2547}
2548
2549#[derive(Clone)]
2550struct ClientServerCapabilities {
2551 /// The Matrix versions the server supports (well-known ones only).
2552 server_versions: Option<Box<[MatrixVersion]>>,
2553
2554 /// The unstable features and their on/off state on the server.
2555 unstable_features: Option<BTreeMap<String, bool>>,
2556}
2557
2558// The http mocking library is not supported for wasm32
2559#[cfg(all(test, not(target_arch = "wasm32")))]
2560pub(crate) mod tests {
2561 use std::{sync::Arc, time::Duration};
2562
2563 use assert_matches::assert_matches;
2564 use futures_util::FutureExt;
2565 use matrix_sdk_base::{
2566 store::{MemoryStore, StoreConfig},
2567 RoomState,
2568 };
2569 use matrix_sdk_test::{
2570 async_test, test_json, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
2571 DEFAULT_TEST_ROOM_ID,
2572 };
2573 #[cfg(target_arch = "wasm32")]
2574 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2575
2576 use ruma::{
2577 api::{client::room::create_room::v3::Request as CreateRoomRequest, MatrixVersion},
2578 assign,
2579 events::ignored_user_list::IgnoredUserListEventContent,
2580 owned_room_id, room_alias_id, room_id, RoomId, ServerName, UserId,
2581 };
2582 use serde_json::json;
2583 use tokio::{
2584 spawn,
2585 time::{sleep, timeout},
2586 };
2587 use url::Url;
2588 use wiremock::{
2589 matchers::{body_json, header, method, path, query_param_is_missing},
2590 Mock, MockServer, ResponseTemplate,
2591 };
2592
2593 use super::Client;
2594 use crate::{
2595 client::WeakClient,
2596 config::{RequestConfig, SyncSettings},
2597 test_utils::{
2598 logged_in_client, mocks::MatrixMockServer, no_retry_test_client, set_client_session,
2599 test_client_builder, test_client_builder_with_server,
2600 },
2601 Error,
2602 };
2603
2604 #[async_test]
2605 async fn test_account_data() {
2606 let server = MockServer::start().await;
2607 let client = logged_in_client(Some(server.uri())).await;
2608
2609 Mock::given(method("GET"))
2610 .and(path("/_matrix/client/r0/sync".to_owned()))
2611 .and(header("authorization", "Bearer 1234"))
2612 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::SYNC))
2613 .mount(&server)
2614 .await;
2615
2616 let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
2617 let _response = client.sync_once(sync_settings).await.unwrap();
2618
2619 let content = client
2620 .account()
2621 .account_data::<IgnoredUserListEventContent>()
2622 .await
2623 .unwrap()
2624 .unwrap()
2625 .deserialize()
2626 .unwrap();
2627
2628 assert_eq!(content.ignored_users.len(), 1);
2629 }
2630
2631 #[async_test]
2632 async fn test_successful_discovery() {
2633 // Imagine this is `matrix.org`.
2634 let server = MockServer::start().await;
2635
2636 // Imagine this is `matrix-client.matrix.org`.
2637 let homeserver = MockServer::start().await;
2638
2639 // Imagine Alice has the user ID `@alice:matrix.org`.
2640 let server_url = server.uri();
2641 let domain = server_url.strip_prefix("http://").unwrap();
2642 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2643
2644 // The `.well-known` is on the server (e.g. `matrix.org`).
2645 Mock::given(method("GET"))
2646 .and(path("/.well-known/matrix/client"))
2647 .respond_with(ResponseTemplate::new(200).set_body_raw(
2648 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", &homeserver.uri()),
2649 "application/json",
2650 ))
2651 .mount(&server)
2652 .await;
2653
2654 // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
2655 Mock::given(method("GET"))
2656 .and(path("/_matrix/client/versions"))
2657 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2658 .mount(&homeserver)
2659 .await;
2660
2661 let client = Client::builder()
2662 .insecure_server_name_no_tls(alice.server_name())
2663 .build()
2664 .await
2665 .unwrap();
2666
2667 assert_eq!(client.server().unwrap(), &Url::parse(&server.uri()).unwrap());
2668 assert_eq!(client.homeserver(), Url::parse(&homeserver.uri()).unwrap());
2669 }
2670
2671 #[async_test]
2672 async fn test_discovery_broken_server() {
2673 let server = MockServer::start().await;
2674 let server_url = server.uri();
2675 let domain = server_url.strip_prefix("http://").unwrap();
2676 let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
2677
2678 Mock::given(method("GET"))
2679 .and(path("/.well-known/matrix/client"))
2680 .respond_with(ResponseTemplate::new(404))
2681 .mount(&server)
2682 .await;
2683
2684 assert!(
2685 Client::builder()
2686 .insecure_server_name_no_tls(alice.server_name())
2687 .build()
2688 .await
2689 .is_err(),
2690 "Creating a client from a user ID should fail when the .well-known request fails."
2691 );
2692 }
2693
2694 #[async_test]
2695 async fn test_room_creation() {
2696 let server = MockServer::start().await;
2697 let client = logged_in_client(Some(server.uri())).await;
2698
2699 let response = SyncResponseBuilder::default()
2700 .add_joined_room(
2701 JoinedRoomBuilder::default()
2702 .add_state_event(StateTestEvent::Member)
2703 .add_state_event(StateTestEvent::PowerLevels),
2704 )
2705 .build_sync_response();
2706
2707 client.inner.base_client.receive_sync_response(response).await.unwrap();
2708
2709 assert_eq!(client.homeserver(), Url::parse(&server.uri()).unwrap());
2710
2711 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
2712 assert_eq!(room.state(), RoomState::Joined);
2713 }
2714
2715 #[async_test]
2716 async fn test_retry_limit_http_requests() {
2717 let server = MockServer::start().await;
2718 let client = test_client_builder(Some(server.uri()))
2719 .request_config(RequestConfig::new().retry_limit(3))
2720 .build()
2721 .await
2722 .unwrap();
2723
2724 assert!(client.request_config().retry_limit.unwrap() == 3);
2725
2726 Mock::given(method("POST"))
2727 .and(path("/_matrix/client/r0/login"))
2728 .respond_with(ResponseTemplate::new(501))
2729 .expect(3)
2730 .mount(&server)
2731 .await;
2732
2733 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2734 }
2735
2736 #[async_test]
2737 async fn test_retry_timeout_http_requests() {
2738 // Keep this timeout small so that the test doesn't take long
2739 let retry_timeout = Duration::from_secs(5);
2740 let server = MockServer::start().await;
2741 let client = test_client_builder(Some(server.uri()))
2742 .request_config(RequestConfig::new().max_retry_time(retry_timeout))
2743 .build()
2744 .await
2745 .unwrap();
2746
2747 assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
2748
2749 Mock::given(method("POST"))
2750 .and(path("/_matrix/client/r0/login"))
2751 .respond_with(ResponseTemplate::new(501))
2752 .expect(2..)
2753 .mount(&server)
2754 .await;
2755
2756 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2757 }
2758
2759 #[async_test]
2760 async fn test_short_retry_initial_http_requests() {
2761 let server = MockServer::start().await;
2762 let client = test_client_builder(Some(server.uri())).build().await.unwrap();
2763
2764 Mock::given(method("POST"))
2765 .and(path("/_matrix/client/r0/login"))
2766 .respond_with(ResponseTemplate::new(501))
2767 .expect(3..)
2768 .mount(&server)
2769 .await;
2770
2771 client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
2772 }
2773
2774 #[async_test]
2775 async fn test_no_retry_http_requests() {
2776 let server = MockServer::start().await;
2777 let client = logged_in_client(Some(server.uri())).await;
2778
2779 Mock::given(method("GET"))
2780 .and(path("/_matrix/client/r0/devices"))
2781 .respond_with(ResponseTemplate::new(501))
2782 .expect(1)
2783 .mount(&server)
2784 .await;
2785
2786 client.devices().await.unwrap_err();
2787 }
2788
2789 #[async_test]
2790 async fn test_set_homeserver() {
2791 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2792 assert_eq!(client.homeserver().as_ref(), "http://localhost/");
2793
2794 let homeserver = Url::parse("http://example.com/").unwrap();
2795 client.set_homeserver(homeserver.clone());
2796 assert_eq!(client.homeserver(), homeserver);
2797 }
2798
2799 #[async_test]
2800 async fn test_search_user_request() {
2801 let server = MockServer::start().await;
2802 let client = logged_in_client(Some(server.uri())).await;
2803
2804 Mock::given(method("POST"))
2805 .and(path("_matrix/client/r0/user_directory/search"))
2806 .and(body_json(&*test_json::search_users::SEARCH_USERS_REQUEST))
2807 .respond_with(
2808 ResponseTemplate::new(200)
2809 .set_body_json(&*test_json::search_users::SEARCH_USERS_RESPONSE),
2810 )
2811 .mount(&server)
2812 .await;
2813
2814 let response = client.search_users("test", 50).await.unwrap();
2815 assert_eq!(response.results.len(), 1);
2816 let result = &response.results[0];
2817 assert_eq!(result.user_id.to_string(), "@test:example.me");
2818 assert_eq!(result.display_name.clone().unwrap(), "Test");
2819 assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
2820 assert!(!response.limited);
2821 }
2822
2823 #[async_test]
2824 async fn test_request_unstable_features() {
2825 let server = MockServer::start().await;
2826 let client = logged_in_client(Some(server.uri())).await;
2827
2828 Mock::given(method("GET"))
2829 .and(path("_matrix/client/versions"))
2830 .respond_with(
2831 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2832 )
2833 .mount(&server)
2834 .await;
2835
2836 let unstable_features = client.unstable_features().await.unwrap();
2837 assert_eq!(unstable_features.get("org.matrix.e2e_cross_signing"), Some(&true));
2838 assert_eq!(unstable_features.get("you.shall.pass"), None);
2839 }
2840
2841 #[async_test]
2842 async fn test_can_homeserver_push_encrypted_event_to_device() {
2843 let server = MockServer::start().await;
2844 let client = logged_in_client(Some(server.uri())).await;
2845
2846 Mock::given(method("GET"))
2847 .and(path("_matrix/client/versions"))
2848 .respond_with(
2849 ResponseTemplate::new(200).set_body_json(&*test_json::api_responses::VERSIONS),
2850 )
2851 .mount(&server)
2852 .await;
2853
2854 let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
2855 assert!(msc4028_enabled);
2856 }
2857
2858 #[async_test]
2859 async fn test_recently_visited_rooms() {
2860 // Tracking recently visited rooms requires authentication
2861 let client = no_retry_test_client(Some("http://localhost".to_owned())).await;
2862 assert_matches!(
2863 client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
2864 Err(Error::AuthenticationRequired)
2865 );
2866
2867 let client = logged_in_client(None).await;
2868 let account = client.account();
2869
2870 // We should start off with an empty list
2871 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
2872
2873 // Tracking a valid room id should add it to the list
2874 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2875 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2876 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2877
2878 // And the existing list shouldn't be changed
2879 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2880 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2881
2882 // Tracking the same room again shouldn't change the list
2883 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2884 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
2885 assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
2886
2887 // Tracking a second room should add it to the front of the list
2888 account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
2889 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2890 assert_eq!(
2891 account.get_recently_visited_rooms().await.unwrap(),
2892 [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
2893 );
2894
2895 // Tracking the first room yet again should move it to the front of the list
2896 account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
2897 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
2898 assert_eq!(
2899 account.get_recently_visited_rooms().await.unwrap(),
2900 [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
2901 );
2902
2903 // Tracking should be capped at 20
2904 for n in 0..20 {
2905 account
2906 .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
2907 .await
2908 .unwrap();
2909 }
2910
2911 assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
2912
2913 // And the initial rooms should've been pushed out
2914 let rooms = account.get_recently_visited_rooms().await.unwrap();
2915 assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
2916 assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
2917
2918 // And the last tracked room should be the first
2919 assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
2920 }
2921
2922 #[async_test]
2923 async fn test_client_no_cycle_with_event_cache() {
2924 let client = logged_in_client(None).await;
2925
2926 // Wait for the init tasks to die.
2927 sleep(Duration::from_secs(1)).await;
2928
2929 let weak_client = WeakClient::from_client(&client);
2930 assert_eq!(weak_client.strong_count(), 1);
2931
2932 {
2933 let room_id = room_id!("!room:example.org");
2934
2935 // Have the client know the room.
2936 let response = SyncResponseBuilder::default()
2937 .add_joined_room(JoinedRoomBuilder::new(room_id))
2938 .build_sync_response();
2939 client.inner.base_client.receive_sync_response(response).await.unwrap();
2940
2941 client.event_cache().subscribe().unwrap();
2942
2943 let (_room_event_cache, _drop_handles) =
2944 client.get_room(room_id).unwrap().event_cache().await.unwrap();
2945 }
2946
2947 drop(client);
2948
2949 // Give a bit of time for background tasks to die.
2950 sleep(Duration::from_secs(1)).await;
2951
2952 // The weak client must be the last reference to the client now.
2953 assert_eq!(weak_client.strong_count(), 0);
2954 let client = weak_client.get();
2955 assert!(
2956 client.is_none(),
2957 "too many strong references to the client: {}",
2958 Arc::strong_count(&client.unwrap().inner)
2959 );
2960 }
2961
2962 #[async_test]
2963 async fn test_server_capabilities_caching() {
2964 let server = MockServer::start().await;
2965 let server_url = server.uri();
2966 let domain = server_url.strip_prefix("http://").unwrap();
2967 let server_name = <&ServerName>::try_from(domain).unwrap();
2968
2969 Mock::given(method("GET"))
2970 .and(path("/.well-known/matrix/client"))
2971 .respond_with(ResponseTemplate::new(200).set_body_raw(
2972 test_json::WELL_KNOWN.to_string().replace("HOMESERVER_URL", server_url.as_ref()),
2973 "application/json",
2974 ))
2975 .named("well known mock")
2976 .expect(2)
2977 .mount(&server)
2978 .await;
2979
2980 let versions_mock = Mock::given(method("GET"))
2981 .and(path("/_matrix/client/versions"))
2982 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
2983 .named("first versions mock")
2984 .expect(1)
2985 .mount_as_scoped(&server)
2986 .await;
2987
2988 let memory_store = Arc::new(MemoryStore::new());
2989 let client = Client::builder()
2990 .insecure_server_name_no_tls(server_name)
2991 .store_config(
2992 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
2993 .state_store(memory_store.clone()),
2994 )
2995 .build()
2996 .await
2997 .unwrap();
2998
2999 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3000
3001 // This second call hits the in-memory cache.
3002 assert!(client
3003 .server_versions()
3004 .await
3005 .unwrap()
3006 .iter()
3007 .any(|version| *version == MatrixVersion::V1_0));
3008
3009 drop(client);
3010
3011 let client = Client::builder()
3012 .insecure_server_name_no_tls(server_name)
3013 .store_config(
3014 StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3015 .state_store(memory_store.clone()),
3016 )
3017 .build()
3018 .await
3019 .unwrap();
3020
3021 // This third call hits the on-disk cache.
3022 assert_eq!(
3023 client.unstable_features().await.unwrap().get("org.matrix.e2e_cross_signing"),
3024 Some(&true)
3025 );
3026
3027 drop(versions_mock);
3028 server.verify().await;
3029
3030 // Now, reset the cache, and observe the endpoint being called again once.
3031 client.reset_server_capabilities().await.unwrap();
3032
3033 Mock::given(method("GET"))
3034 .and(path("/_matrix/client/versions"))
3035 .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS))
3036 .expect(1)
3037 .named("second versions mock")
3038 .mount(&server)
3039 .await;
3040
3041 // Hits network again.
3042 assert_eq!(client.server_versions().await.unwrap().len(), 1);
3043 // Hits in-memory cache again.
3044 assert!(client
3045 .server_versions()
3046 .await
3047 .unwrap()
3048 .iter()
3049 .any(|version| *version == MatrixVersion::V1_0));
3050 }
3051
3052 #[async_test]
3053 async fn test_no_network_doesnt_cause_infinite_retries() {
3054 // Note: not `no_retry_test_client` or `logged_in_client` which uses the former,
3055 // since we want infinite retries for transient errors.
3056 let client =
3057 test_client_builder(None).request_config(RequestConfig::new()).build().await.unwrap();
3058 set_client_session(&client).await;
3059
3060 // We don't define a mock server on purpose here, so that the error is really a
3061 // network error.
3062 client.whoami().await.unwrap_err();
3063 }
3064
3065 #[async_test]
3066 async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3067 let (client_builder, server) = test_client_builder_with_server().await;
3068 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3069 set_client_session(&client).await;
3070
3071 let builder = Mock::given(method("GET"))
3072 .and(path("/_matrix/client/r0/sync"))
3073 .and(header("authorization", "Bearer 1234"))
3074 .and(query_param_is_missing("since"));
3075
3076 let room_id = room_id!("!room:example.org");
3077 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3078 let mut sync_response_builder = SyncResponseBuilder::new();
3079 sync_response_builder.add_joined_room(joined_room_builder);
3080 let response_body = sync_response_builder.build_json_sync_response();
3081
3082 builder
3083 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3084 .mount(&server)
3085 .await;
3086
3087 client.sync_once(SyncSettings::default()).await.unwrap();
3088
3089 let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3090 assert_eq!(room.room_id(), room_id);
3091 }
3092
3093 #[async_test]
3094 async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3095 let (client_builder, server) = test_client_builder_with_server().await;
3096 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3097 set_client_session(&client).await;
3098
3099 let builder = Mock::given(method("GET"))
3100 .and(path("/_matrix/client/r0/sync"))
3101 .and(header("authorization", "Bearer 1234"))
3102 .and(query_param_is_missing("since"));
3103
3104 let room_id = room_id!("!room:example.org");
3105 let joined_room_builder = JoinedRoomBuilder::new(room_id);
3106 let mut sync_response_builder = SyncResponseBuilder::new();
3107 sync_response_builder.add_joined_room(joined_room_builder);
3108 let response_body = sync_response_builder.build_json_sync_response();
3109
3110 builder
3111 .respond_with(ResponseTemplate::new(200).set_body_json(response_body))
3112 .mount(&server)
3113 .await;
3114
3115 let client = Arc::new(client);
3116
3117 // Perform the /sync request with a delay so it starts after the
3118 // `await_room_remote_echo` call has happened
3119 spawn({
3120 let client = client.clone();
3121 async move {
3122 sleep(Duration::from_millis(100)).await;
3123 client.sync_once(SyncSettings::default()).await.unwrap();
3124 }
3125 });
3126
3127 let room =
3128 timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3129 assert_eq!(room.room_id(), room_id);
3130 }
3131
3132 #[async_test]
3133 async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3134 let (client_builder, _) = test_client_builder_with_server().await;
3135 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3136 set_client_session(&client).await;
3137
3138 let room_id = room_id!("!room:example.org");
3139 // Room is not present so the client won't be able to find it. The call will
3140 // timeout.
3141 timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3142 }
3143
3144 #[async_test]
3145 async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3146 let (client_builder, server) = test_client_builder_with_server().await;
3147 let client = client_builder.request_config(RequestConfig::new()).build().await.unwrap();
3148 set_client_session(&client).await;
3149
3150 Mock::given(method("POST"))
3151 .and(path("_matrix/client/r0/createRoom"))
3152 .and(header("authorization", "Bearer 1234"))
3153 .respond_with(
3154 ResponseTemplate::new(200).set_body_json(json!({ "room_id": "!room:example.org"})),
3155 )
3156 .mount(&server)
3157 .await;
3158
3159 // Create a room in the internal store
3160 let room = client
3161 .create_room(assign!(CreateRoomRequest::new(), {
3162 invite: vec![],
3163 is_direct: false,
3164 }))
3165 .await
3166 .unwrap();
3167
3168 // Room is locally present, but not synced, the call will timeout
3169 timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3170 .await
3171 .unwrap_err();
3172 }
3173
3174 #[async_test]
3175 async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3176 let server = MatrixMockServer::new().await;
3177 let client = server.client_builder().build().await;
3178
3179 server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3180
3181 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3182 assert_matches!(ret, Ok(true));
3183 }
3184
3185 #[async_test]
3186 async fn test_is_room_alias_available_if_alias_is_resolved() {
3187 let server = MatrixMockServer::new().await;
3188 let client = server.client_builder().build().await;
3189
3190 server
3191 .mock_room_directory_resolve_alias()
3192 .ok("!some_room_id:matrix.org", Vec::new())
3193 .expect(1)
3194 .mount()
3195 .await;
3196
3197 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3198 assert_matches!(ret, Ok(false));
3199 }
3200
3201 #[async_test]
3202 async fn test_is_room_alias_available_if_error_found() {
3203 let server = MatrixMockServer::new().await;
3204 let client = server.client_builder().build().await;
3205
3206 server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3207
3208 let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3209 assert_matches!(ret, Err(_));
3210 }
3211
3212 #[async_test]
3213 async fn test_create_room_alias() {
3214 let server = MatrixMockServer::new().await;
3215 let client = server.client_builder().build().await;
3216
3217 server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3218
3219 let ret = client
3220 .create_room_alias(
3221 room_alias_id!("#some_alias:matrix.org"),
3222 room_id!("!some_room:matrix.org"),
3223 )
3224 .await;
3225 assert_matches!(ret, Ok(()));
3226 }
3227
3228 #[async_test]
3229 async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3230 let server = MatrixMockServer::new().await;
3231 let client = server.client_builder().build().await;
3232
3233 let room_id = room_id!("!a-room:matrix.org");
3234
3235 // Make sure the summary endpoint is called once
3236 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3237
3238 // We create a locally cached invited room
3239 let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3240
3241 // And we get a preview, the server endpoint was reached
3242 let preview = client
3243 .get_room_preview(room_id.into(), Vec::new())
3244 .await
3245 .expect("Room preview should be retrieved");
3246
3247 assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3248 }
3249
3250 #[async_test]
3251 async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3252 let server = MatrixMockServer::new().await;
3253 let client = server.client_builder().build().await;
3254
3255 let room_id = room_id!("!a-room:matrix.org");
3256
3257 // Make sure the summary endpoint is called once
3258 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3259
3260 // We create a locally cached left room
3261 let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3262
3263 // And we get a preview, the server endpoint was reached
3264 let preview = client
3265 .get_room_preview(room_id.into(), Vec::new())
3266 .await
3267 .expect("Room preview should be retrieved");
3268
3269 assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3270 }
3271
3272 #[async_test]
3273 async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3274 let server = MatrixMockServer::new().await;
3275 let client = server.client_builder().build().await;
3276
3277 let room_id = room_id!("!a-room:matrix.org");
3278
3279 // Make sure the summary endpoint is called once
3280 server.mock_room_summary().ok(room_id).mock_once().mount().await;
3281
3282 // We create a locally cached knocked room
3283 let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3284
3285 // And we get a preview, the server endpoint was reached
3286 let preview = client
3287 .get_room_preview(room_id.into(), Vec::new())
3288 .await
3289 .expect("Room preview should be retrieved");
3290
3291 assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3292 }
3293
3294 #[async_test]
3295 async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3296 let server = MatrixMockServer::new().await;
3297 let client = server.client_builder().build().await;
3298
3299 let room_id = room_id!("!a-room:matrix.org");
3300
3301 // Make sure the summary endpoint is not called
3302 server.mock_room_summary().ok(room_id).never().mount().await;
3303
3304 // We create a locally cached joined room
3305 let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3306
3307 // And we get a preview, no server endpoint was reached
3308 let preview = client
3309 .get_room_preview(room_id.into(), Vec::new())
3310 .await
3311 .expect("Room preview should be retrieved");
3312
3313 assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3314 }
3315
3316 #[async_test]
3317 async fn test_room_preview_for_banned_room_retrieves_local_room_info() {
3318 let server = MatrixMockServer::new().await;
3319 let client = server.client_builder().build().await;
3320
3321 let room_id = room_id!("!a-room:matrix.org");
3322
3323 // Make sure the summary endpoint is not called
3324 server.mock_room_summary().ok(room_id).never().mount().await;
3325
3326 // We create a locally cached banned room
3327 let banned_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Banned);
3328
3329 // And we get a preview, no server endpoint was reached
3330 let preview = client
3331 .get_room_preview(room_id.into(), Vec::new())
3332 .await
3333 .expect("Room preview should be retrieved");
3334
3335 assert_eq!(banned_room.room_id().to_owned(), preview.room_id);
3336 }
3337}