matrix_sdk/room/mod.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, DmRoomDefinition, EncryptionState, RoomInfoNotableUpdateReasons,
44 RoomMemberships, SendOutsideWasm, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 serde_helpers::extract_relation,
50 store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57 deserialized_responses::TimelineEvent,
58 executor::{JoinHandle, spawn},
59 timeout::timeout,
60};
61use mime::Mime;
62use reply::Reply;
63#[cfg(feature = "e2e-encryption")]
64use ruma::events::AnySyncMessageLikeEvent;
65#[cfg(feature = "experimental-encrypted-state-events")]
66use ruma::events::AnySyncStateEvent;
67#[cfg(feature = "unstable-msc4274")]
68use ruma::events::room::message::GalleryItemType;
69#[cfg(feature = "e2e-encryption")]
70use ruma::events::{
71 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
72};
73use ruma::{
74 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
75 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
76 api::{
77 client::{
78 config::{set_global_account_data, set_room_account_data},
79 context,
80 filter::LazyLoadOptions,
81 membership::{
82 Invite3pid, ban_user, forget_room, get_member_events,
83 invite_user::{
84 self,
85 v3::{InvitationRecipient, InviteUserId},
86 },
87 kick_user, leave_room, unban_user,
88 },
89 message::send_message_event,
90 read_marker::set_read_marker,
91 receipt::create_receipt,
92 redact::redact_event,
93 room::{get_room_event, report_content, report_room},
94 state::{get_state_event_for_key, send_state_event},
95 tag::{create_tag, delete_tag},
96 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
97 typing::create_typing_event::{
98 self,
99 v3::{Typing, TypingInfo},
100 },
101 },
102 error::ErrorKind,
103 },
104 assign,
105 events::{
106 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
107 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
108 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
109 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
110 StaticStateEventContent, SyncStateEvent,
111 beacon::BeaconEventContent,
112 beacon_info::BeaconInfoEventContent,
113 direct::DirectEventContent,
114 marked_unread::MarkedUnreadEventContent,
115 receipt::{Receipt, ReceiptThread, ReceiptType},
116 relation::RelationType,
117 room::{
118 ImageInfo, MediaSource, ThumbnailInfo,
119 avatar::{self, RoomAvatarEventContent},
120 encryption::PossiblyRedactedRoomEncryptionEventContent,
121 history_visibility::HistoryVisibility,
122 member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
123 message::{
124 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
125 ImageMessageEventContent, MessageType, RoomMessageEventContent,
126 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
127 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
128 },
129 name::RoomNameEventContent,
130 pinned_events::RoomPinnedEventsEventContent,
131 power_levels::{
132 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
133 },
134 server_acl::RoomServerAclEventContent,
135 topic::RoomTopicEventContent,
136 },
137 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
138 tag::{TagInfo, TagName},
139 typing::SyncTypingEvent,
140 },
141 int,
142 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
143 serde::Raw,
144 time::Instant,
145 uint,
146};
147#[cfg(feature = "experimental-encrypted-state-events")]
148use ruma::{
149 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
150 serde::JsonCastable,
151};
152use serde::de::DeserializeOwned;
153use thiserror::Error;
154use tokio::{join, sync::broadcast};
155use tracing::{debug, error, info, instrument, trace, warn};
156
157use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
158pub use self::{
159 member::{RoomMember, RoomMemberRole},
160 messages::{
161 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
162 Relations, RelationsOptions, ThreadRoots,
163 },
164};
165#[cfg(feature = "e2e-encryption")]
166use crate::encryption::backups::BackupState;
167#[cfg(doc)]
168use crate::event_cache::EventCache;
169#[cfg(feature = "experimental-encrypted-state-events")]
170use crate::room::futures::{SendRawStateEvent, SendStateEvent};
171use crate::{
172 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
173 attachment::{AttachmentConfig, AttachmentInfo},
174 client::WeakClient,
175 config::RequestConfig,
176 error::{BeaconError, WrongRoomState},
177 event_cache::{self, EventCacheDropHandles, RoomEventCache},
178 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
179 live_locations_observer::LiveLocationsObserver,
180 media::{MediaFormat, MediaRequestParameters},
181 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
182 room::{
183 knock_requests::{KnockRequest, KnockRequestMemberInfo},
184 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
185 privacy_settings::RoomPrivacySettings,
186 },
187 sync::RoomUpdate,
188 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
189};
190
191pub mod edit;
192pub mod futures;
193pub mod identity_status_changes;
194/// Contains code related to requests to join a room.
195pub mod knock_requests;
196mod member;
197mod messages;
198pub mod power_levels;
199pub mod reply;
200
201pub mod calls;
202
203/// Contains all the functionality for modifying the privacy settings in a room.
204pub mod privacy_settings;
205
206#[cfg(feature = "e2e-encryption")]
207pub(crate) mod shared_room_history;
208
209/// A struct containing methods that are common for Joined, Invited and Left
210/// Rooms
211#[derive(Debug, Clone)]
212pub struct Room {
213 inner: BaseRoom,
214 pub(crate) client: Client,
215}
216
217impl Deref for Room {
218 type Target = BaseRoom;
219
220 fn deref(&self) -> &Self::Target {
221 &self.inner
222 }
223}
224
225const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
226const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
227
228/// A thread subscription, according to the semantics of MSC4306.
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub struct ThreadSubscription {
231 /// Whether the subscription was made automatically by a client, not by
232 /// manual user choice.
233 pub automatic: bool,
234}
235
236/// Context allowing to compute the push actions for a given event.
237#[derive(Debug)]
238pub struct PushContext {
239 /// The Ruma context used to compute the push actions.
240 push_condition_room_ctx: PushConditionRoomCtx,
241
242 /// Push rules for this room, based on the push rules state event, or the
243 /// global server default as defined by [`Ruleset::server_default`].
244 push_rules: Ruleset,
245}
246
247impl PushContext {
248 /// Create a new [`PushContext`] from its inner components.
249 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
250 Self { push_condition_room_ctx, push_rules }
251 }
252
253 /// Compute the push rules for a given event.
254 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
256 }
257
258 /// Compute the push rules for a given event, with extra logging to help
259 /// debugging.
260 #[doc(hidden)]
261 #[instrument(skip_all)]
262 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
263 let rules = self
264 .push_rules
265 .iter()
266 .filter_map(|r| {
267 if !r.enabled() {
268 return None;
269 }
270
271 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
272
273 let conditions = match r {
274 AnyPushRuleRef::Override(r) => {
275 format!("{:?}", r.conditions)
276 }
277 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
278 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
279 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
280 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
281 _ => "<unknown push rule kind>".to_owned(),
282 };
283
284 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
285 })
286 .collect::<Vec<_>>()
287 .join("\n");
288 trace!("rules:\n\n{rules}\n\n");
289
290 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
291
292 if let Some(found) = found {
293 trace!("rule {} matched", found.rule_id());
294 found.actions().to_owned()
295 } else {
296 trace!("no match");
297 Vec::new()
298 }
299 }
300}
301
302macro_rules! make_media_type {
303 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
304 // If caption is set, use it as body, and filename as the file name; otherwise,
305 // body is the filename, and the filename is not set.
306 // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
307 let (body, formatted, filename) = match $caption {
308 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
309 None => ($filename, None, None),
310 };
311
312 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
313
314 match $content_type.type_() {
315 mime::IMAGE => {
316 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
317 mimetype: Some($content_type.as_ref().to_owned()),
318 thumbnail_source,
319 thumbnail_info
320 });
321 let content = assign!(ImageMessageEventContent::new(body, $source), {
322 info: Some(Box::new(info)),
323 formatted,
324 filename
325 });
326 <$t>::Image(content)
327 }
328
329 mime::AUDIO => {
330 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
331 formatted,
332 filename
333 });
334
335 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
336 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
337 let waveform = waveform_vec
338 .iter()
339 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
340 .map(Into::into)
341 .collect();
342 content.audio =
343 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
344 }
345
346 if matches!($info, Some(AttachmentInfo::Voice(_))) {
347 content.voice = Some(UnstableVoiceContentBlock::new());
348 }
349
350 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
351 audio_info.mimetype = Some($content_type.as_ref().to_owned());
352 let content = content.info(Box::new(audio_info));
353
354 <$t>::Audio(content)
355 }
356
357 mime::VIDEO => {
358 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
359 mimetype: Some($content_type.as_ref().to_owned()),
360 thumbnail_source,
361 thumbnail_info
362 });
363 let content = assign!(VideoMessageEventContent::new(body, $source), {
364 info: Some(Box::new(info)),
365 formatted,
366 filename
367 });
368 <$t>::Video(content)
369 }
370
371 _ => {
372 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
373 mimetype: Some($content_type.as_ref().to_owned()),
374 thumbnail_source,
375 thumbnail_info
376 });
377 let content = assign!(FileMessageEventContent::new(body, $source), {
378 info: Some(Box::new(info)),
379 formatted,
380 filename,
381 });
382 <$t>::File(content)
383 }
384 }
385 }};
386}
387
388impl Room {
389 /// Create a new `Room`
390 ///
391 /// # Arguments
392 /// * `client` - The client used to make requests.
393 ///
394 /// * `room` - The underlying room.
395 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
396 Self { inner: room, client }
397 }
398
399 /// Leave this room.
400 /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
401 /// automatically.
402 ///
403 /// Only invited and joined rooms can be left.
404 #[doc(alias = "reject_invitation")]
405 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
406 async fn leave_impl(&self) -> (Result<()>, &Room) {
407 let state = self.state();
408 if state == RoomState::Left {
409 return (
410 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
411 "Joined or Invited",
412 state,
413 )))),
414 self,
415 );
416 }
417
418 // If the room was in Invited state we should also forget it when declining the
419 // invite.
420 let should_forget = matches!(self.state(), RoomState::Invited);
421
422 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
423 let response = self.client.send(request).await;
424
425 // The server can return with an error that is acceptable to ignore. Let's find
426 // which one.
427 if let Err(error) = response {
428 #[allow(clippy::collapsible_match)]
429 let ignore_error = if let Some(error) = error.client_api_error_kind() {
430 match error {
431 // The user is trying to leave a room but doesn't have permissions to do so.
432 // Let's consider the user has left the room.
433 ErrorKind::Forbidden => true,
434 _ => false,
435 }
436 } else {
437 false
438 };
439
440 error!(?error, ignore_error, should_forget, "Failed to leave the room");
441
442 if !ignore_error {
443 return (Err(error.into()), self);
444 }
445 }
446
447 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
448 return (Err(e.into()), self);
449 }
450
451 if should_forget {
452 trace!("Trying to forget the room");
453
454 if let Err(error) = self.forget().await {
455 error!(?error, "Failed to forget the room");
456 }
457 }
458
459 (Ok(()), self)
460 }
461
462 /// Leave this room and all predecessors.
463 /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
464 /// automatically.
465 ///
466 /// Only invited and joined rooms can be left.
467 /// Will return an error if the current room fails to leave but
468 /// will only warn if a predecessor fails to leave.
469 pub async fn leave(&self) -> Result<()> {
470 let mut rooms: Vec<Room> = vec![self.clone()];
471 let mut current_room = self;
472
473 while let Some(predecessor) = current_room.predecessor_room() {
474 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
475
476 if let Some(predecessor_room) = maybe_predecessor_room {
477 rooms.push(predecessor_room.clone());
478 current_room = rooms.last().expect("Room just pushed so can't be empty");
479 } else {
480 warn!("Cannot find predecessor room");
481 break;
482 }
483 }
484
485 let batch_size = 5;
486
487 let rooms_futures: Vec<_> = rooms
488 .iter()
489 .filter_map(|room| match room.state() {
490 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
491 Some(room.leave_impl())
492 }
493 RoomState::Banned | RoomState::Left => None,
494 })
495 .collect();
496
497 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
498
499 let mut maybe_this_room_failed_with: Option<Error> = None;
500
501 while let Some(result) = futures_stream.next().await {
502 if let (Err(e), room) = result {
503 if room.room_id() == self.room_id() {
504 maybe_this_room_failed_with = Some(e);
505 } else {
506 warn!("Failure while attempting to leave predecessor room: {e:?}");
507 }
508 }
509 }
510
511 maybe_this_room_failed_with.map_or(Ok(()), Err)
512 }
513
514 /// Join this room.
515 ///
516 /// Only invited and left rooms can be joined via this method.
517 #[doc(alias = "accept_invitation")]
518 pub async fn join(&self) -> Result<()> {
519 let prev_room_state = self.inner.state();
520
521 if prev_room_state == RoomState::Joined {
522 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
523 "Invited or Left",
524 prev_room_state,
525 ))));
526 }
527
528 self.client.join_room_by_id(self.room_id()).await?;
529
530 Ok(())
531 }
532
533 /// Get the inner client saved in this room instance.
534 ///
535 /// Returns the client this room is part of.
536 pub fn client(&self) -> Client {
537 self.client.clone()
538 }
539
540 /// Get the sync state of this room, i.e. whether it was fully synced with
541 /// the server.
542 pub fn is_synced(&self) -> bool {
543 self.inner.is_state_fully_synced()
544 }
545
546 /// Gets the avatar of this room, if set.
547 ///
548 /// Returns the avatar.
549 /// If a thumbnail is requested no guarantee on the size of the image is
550 /// given.
551 ///
552 /// # Arguments
553 ///
554 /// * `format` - The desired format of the avatar.
555 ///
556 /// # Examples
557 ///
558 /// ```no_run
559 /// # use matrix_sdk::Client;
560 /// # use matrix_sdk::ruma::room_id;
561 /// # use matrix_sdk::media::MediaFormat;
562 /// # use url::Url;
563 /// # let homeserver = Url::parse("http://example.com").unwrap();
564 /// # async {
565 /// # let user = "example";
566 /// let client = Client::new(homeserver).await.unwrap();
567 /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
568 /// let room_id = room_id!("!roomid:example.com");
569 /// let room = client.get_room(&room_id).unwrap();
570 /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
571 /// std::fs::write("avatar.png", avatar);
572 /// }
573 /// # };
574 /// ```
575 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
576 let Some(url) = self.avatar_url() else { return Ok(None) };
577 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
578 Ok(Some(self.client.media().get_media_content(&request, true).await?))
579 }
580
581 /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
582 /// returns a `Messages` struct that contains a chunk of room and state
583 /// events (`RoomEvent` and `AnyStateEvent`).
584 ///
585 /// With the encryption feature, messages are decrypted if possible. If
586 /// decryption fails for an individual message, that message is returned
587 /// undecrypted.
588 ///
589 /// # Examples
590 ///
591 /// ```no_run
592 /// use matrix_sdk::{Client, room::MessagesOptions};
593 /// # use matrix_sdk::ruma::{
594 /// # api::client::filter::RoomEventFilter,
595 /// # room_id,
596 /// # };
597 /// # use url::Url;
598 ///
599 /// # let homeserver = Url::parse("http://example.com").unwrap();
600 /// # async {
601 /// let options =
602 /// MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
603 ///
604 /// let mut client = Client::new(homeserver).await.unwrap();
605 /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
606 /// assert!(room.messages(options).await.is_ok());
607 /// # };
608 /// ```
609 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
610 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
611 let room_id = self.inner.room_id();
612 let request = options.into_request(room_id);
613 let http_response = self.client.send(request).await?;
614
615 let push_ctx = self.push_context().await?;
616 let chunk = join_all(
617 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
618 )
619 .await;
620
621 // Save the loaded events into the event cache, if it's set up.
622 if let Ok((cache, _handles)) = self.event_cache().await {
623 cache.save_events(chunk.clone()).await;
624 }
625
626 Ok(Messages {
627 start: http_response.start,
628 end: http_response.end,
629 chunk,
630 state: http_response.state,
631 })
632 }
633
634 /// Register a handler for events of a specific type, within this room.
635 ///
636 /// This method works the same way as [`Client::add_event_handler`], except
637 /// that the handler will only be called for events within this room. See
638 /// that method for more details on event handler functions.
639 ///
640 /// `room.add_event_handler(hdl)` is equivalent to
641 /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
642 /// convenient in your use case.
643 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
644 where
645 Ev: SyncEvent + DeserializeOwned + Send + 'static,
646 H: EventHandler<Ev, Ctx>,
647 {
648 self.client.add_room_event_handler(self.room_id(), handler)
649 }
650
651 /// Subscribe to all updates for this room.
652 ///
653 /// The returned receiver will receive a new message for each sync response
654 /// that contains updates for this room.
655 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
656 self.client.subscribe_to_room_updates(self.room_id())
657 }
658
659 /// Subscribe to typing notifications for this room.
660 ///
661 /// The returned receiver will receive a new vector of user IDs for each
662 /// sync response that contains 'm.typing' event. The current user ID will
663 /// be filtered out.
664 pub fn subscribe_to_typing_notifications(
665 &self,
666 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
667 let (sender, receiver) = broadcast::channel(16);
668 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
669 let own_user_id = self.own_user_id().to_owned();
670 move |event: SyncTypingEvent| async move {
671 // Ignore typing notifications from own user.
672 let typing_user_ids = event
673 .content
674 .user_ids
675 .into_iter()
676 .filter(|user_id| *user_id != own_user_id)
677 .collect();
678 // Ignore the result. It can only fail if there are no listeners.
679 let _ = sender.send(typing_user_ids);
680 }
681 });
682 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
683 (drop_guard, receiver)
684 }
685
686 /// Subscribe to updates about users who are in "pin violation" i.e. their
687 /// identity has changed and the user has not yet acknowledged this.
688 ///
689 /// The returned receiver will receive a new vector of
690 /// [`IdentityStatusChange`] each time a /keys/query response shows a
691 /// changed identity for a member of this room, or a sync shows a change
692 /// to the membership of an affected user. (Changes to the current user are
693 /// not directly included, but some changes to the current user's identity
694 /// can trigger changes to how we see other users' identities, which
695 /// will be included.)
696 ///
697 /// The first item in the stream provides the current state of the room:
698 /// each member of the room who is not in "pinned" or "verified" state will
699 /// be included (except the current user).
700 ///
701 /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
702 /// `PinViolation` then a warning should be displayed to the user. If it is
703 /// set to `Pinned` then no warning should be displayed.
704 ///
705 /// Note that if a user who is in pin violation leaves the room, a `Pinned`
706 /// update is sent, to indicate that the warning should be removed, even
707 /// though the user's identity is not necessarily pinned.
708 #[cfg(feature = "e2e-encryption")]
709 pub async fn subscribe_to_identity_status_changes(
710 &self,
711 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
712 IdentityStatusChanges::create_stream(self.clone()).await
713 }
714
715 /// Subscribes to active live location shares in this room.
716 ///
717 /// Returns a [`LiveLocationsObserver`] that holds the current state and
718 /// exposes a stream of incremental [`eyeball_im::VectorDiff`] updates via
719 /// [`LiveLocationsObserver::subscribe`].
720 ///
721 /// Event handlers are active for as long as the returned struct is alive.
722 pub async fn live_locations_observer(&self) -> LiveLocationsObserver {
723 LiveLocationsObserver::new(self.clone()).await
724 }
725
726 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
727 /// decrypted if needs be.
728 ///
729 /// Only logs from the crypto crate will indicate a failure to decrypt.
730 #[cfg(not(feature = "experimental-encrypted-state-events"))]
731 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
732 async fn try_decrypt_event(
733 &self,
734 event: Raw<AnyTimelineEvent>,
735 push_ctx: Option<&PushContext>,
736 ) -> TimelineEvent {
737 #[cfg(feature = "e2e-encryption")]
738 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
739 SyncMessageLikeEvent::Original(_),
740 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
741 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
742 {
743 return event;
744 }
745
746 let mut event = TimelineEvent::from_plaintext(event.cast());
747 if let Some(push_ctx) = push_ctx {
748 event.set_push_actions(push_ctx.for_event(event.raw()).await);
749 }
750
751 event
752 }
753
754 /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
755 /// decrypted if needs be.
756 ///
757 /// Only logs from the crypto crate will indicate a failure to decrypt.
758 #[cfg(feature = "experimental-encrypted-state-events")]
759 #[allow(clippy::unused_async)] // Used only in e2e-encryption.
760 async fn try_decrypt_event(
761 &self,
762 event: Raw<AnyTimelineEvent>,
763 push_ctx: Option<&PushContext>,
764 ) -> TimelineEvent {
765 // If we have either an encrypted message-like or state event, try to decrypt.
766 match event.deserialize_as::<AnySyncTimelineEvent>() {
767 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
768 SyncMessageLikeEvent::Original(_),
769 ))) => {
770 if let Ok(event) = self
771 .decrypt_event(
772 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
773 push_ctx,
774 )
775 .await
776 {
777 return event;
778 }
779 }
780 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
781 SyncStateEvent::Original(_),
782 ))) => {
783 if let Ok(event) = self
784 .decrypt_event(
785 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
786 push_ctx,
787 )
788 .await
789 {
790 return event;
791 }
792 }
793 _ => {}
794 }
795
796 let mut event = TimelineEvent::from_plaintext(event.cast());
797 if let Some(push_ctx) = push_ctx {
798 event.set_push_actions(push_ctx.for_event(event.raw()).await);
799 }
800
801 event
802 }
803
804 /// Fetch the event with the given `EventId` in this room.
805 ///
806 /// It uses the given [`RequestConfig`] if provided, or the client's default
807 /// one otherwise.
808 pub async fn event(
809 &self,
810 event_id: &EventId,
811 request_config: Option<RequestConfig>,
812 ) -> Result<TimelineEvent> {
813 let request =
814 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
815
816 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
817 let push_ctx = self.push_context().await?;
818 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
819
820 // Save the event into the event cache, if it's set up.
821 if let Ok((cache, _handles)) = self.event_cache().await {
822 cache.save_events([event.clone()]).await;
823 }
824
825 Ok(event)
826 }
827
828 /// Try to load the event from the [`EventCache`][crate::event_cache], if
829 /// it's enabled, or fetch it from the homeserver.
830 ///
831 /// When running the request against the homeserver, it uses the given
832 /// [`RequestConfig`] if provided, or the client's default one
833 /// otherwise.
834 pub async fn load_or_fetch_event(
835 &self,
836 event_id: &EventId,
837 request_config: Option<RequestConfig>,
838 ) -> Result<TimelineEvent> {
839 match self.event_cache().await {
840 Ok((event_cache, _drop_handles)) => {
841 if let Some(event) = event_cache.find_event(event_id).await? {
842 return Ok(event);
843 }
844 // Fallthrough: try with a request.
845 }
846 Err(err) => {
847 debug!("error when getting the event cache: {err}");
848 }
849 }
850 self.event(event_id, request_config).await
851 }
852
853 /// Try to load the event and its relations from the
854 /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
855 /// from the homeserver.
856 ///
857 /// You can control which types of related events are retrieved using
858 /// `filter`. A `None` value will retrieve any type of related event.
859 ///
860 /// If the event is found in the event cache, but we can't find any
861 /// relations for it there, then we will still attempt to fetch the
862 /// relations from the homeserver.
863 ///
864 /// When running any request against the homeserver, it uses the given
865 /// [`RequestConfig`] if provided, or the client's default one
866 /// otherwise.
867 ///
868 /// Returns a tuple formed of the event and a vector of its relations (that
869 /// can be empty).
870 pub async fn load_or_fetch_event_with_relations(
871 &self,
872 event_id: &EventId,
873 filter: Option<Vec<RelationType>>,
874 request_config: Option<RequestConfig>,
875 ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
876 let fetch_relations = async || {
877 // If there's only a single filter, we can use a more efficient request,
878 // specialized on the filter type.
879 //
880 // Otherwise, we need to get all the relations:
881 // - either because no filters implies we fetch all relations,
882 // - or because there are multiple filters and we must filter out manually.
883 let include_relations = if let Some(filter) = &filter
884 && filter.len() == 1
885 {
886 IncludeRelations::RelationsOfType(filter[0].clone())
887 } else {
888 IncludeRelations::AllRelations
889 };
890
891 let mut opts = RelationsOptions {
892 include_relations,
893 recurse: true,
894 limit: Some(uint!(256)),
895 ..Default::default()
896 };
897
898 let mut events = Vec::new();
899 loop {
900 match self.relations(event_id.to_owned(), opts.clone()).await {
901 Ok(relations) => {
902 if let Some(filter) = filter.as_ref() {
903 // Manually filter out the relation types we're interested in.
904 events.extend(relations.chunk.into_iter().filter_map(|ev| {
905 let (rel_type, _) = extract_relation(ev.raw())?;
906 filter
907 .iter()
908 .any(|ruma_filter| ruma_filter == &rel_type)
909 .then_some(ev)
910 }));
911 } else {
912 // No filter: include all events from the response.
913 events.extend(relations.chunk);
914 }
915
916 if let Some(next_from) = relations.next_batch_token {
917 opts.from = Some(next_from);
918 } else {
919 break events;
920 }
921 }
922
923 Err(err) => {
924 warn!(%event_id, "error when loading relations of pinned event from server: {err}");
925 break events;
926 }
927 }
928 }
929 };
930
931 // First, try to load the event *and* its relations from the event cache, all at
932 // once.
933 let event_cache = match self.event_cache().await {
934 Ok((event_cache, drop_handles)) => {
935 if let Some((event, mut relations)) =
936 event_cache.find_event_with_relations(event_id, filter.clone()).await?
937 {
938 if relations.is_empty() {
939 // The event cache doesn't have any relations for this event, try to fetch
940 // them from the server instead.
941 relations = fetch_relations().await;
942 }
943
944 return Ok((event, relations));
945 }
946
947 // Otherwise, get the event from the server.
948 Some((event_cache, drop_handles))
949 }
950
951 Err(err) => {
952 debug!("error when getting the event cache: {err}");
953 // Fallthrough: try with a request.
954 None
955 }
956 };
957
958 // Fetch the event from the server. A failure here is fatal, as we must return
959 // the target event.
960 let event = self.event(event_id, request_config).await?;
961
962 // Try to get the relations from the event cache (if we have one).
963 if let Some((event_cache, _drop_handles)) = event_cache
964 && let Some(relations) =
965 event_cache.find_event_relations(event_id, filter.clone()).await.ok()
966 && !relations.is_empty()
967 {
968 return Ok((event, relations));
969 }
970
971 // We couldn't find the relations in the event cache; fetch them from the
972 // server.
973 Ok((event, fetch_relations().await))
974 }
975
976 /// Fetch the event with the given `EventId` in this room, using the
977 /// `/context` endpoint to get more information.
978 pub async fn event_with_context(
979 &self,
980 event_id: &EventId,
981 lazy_load_members: bool,
982 context_size: UInt,
983 request_config: Option<RequestConfig>,
984 ) -> Result<EventWithContextResponse> {
985 let mut request =
986 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
987
988 request.limit = context_size;
989
990 if lazy_load_members {
991 request.filter.lazy_load_options =
992 LazyLoadOptions::Enabled { include_redundant_members: false };
993 }
994
995 let response = self.client.send(request).with_request_config(request_config).await?;
996
997 let push_ctx = self.push_context().await?;
998 let push_ctx = push_ctx.as_ref();
999 let target_event = if let Some(event) = response.event {
1000 Some(self.try_decrypt_event(event, push_ctx).await)
1001 } else {
1002 None
1003 };
1004
1005 // Note: the joined future will fail if any future failed, but
1006 // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
1007 // decryption error, so we should prevent against most bad cases here.
1008 let (events_before, events_after) = join!(
1009 join_all(
1010 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1011 ),
1012 join_all(
1013 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1014 ),
1015 );
1016
1017 // Save the loaded events into the event cache, if it's set up.
1018 if let Ok((cache, _handles)) = self.event_cache().await {
1019 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1020 if let Some(event) = &target_event {
1021 events_to_save.push(event.clone());
1022 }
1023
1024 for event in &events_before {
1025 events_to_save.push(event.clone());
1026 }
1027
1028 for event in &events_after {
1029 events_to_save.push(event.clone());
1030 }
1031
1032 cache.save_events(events_to_save).await;
1033 }
1034
1035 Ok(EventWithContextResponse {
1036 event: target_event,
1037 events_before,
1038 events_after,
1039 state: response.state,
1040 prev_batch_token: response.start,
1041 next_batch_token: response.end,
1042 })
1043 }
1044
1045 pub(crate) async fn request_members(&self) -> Result<()> {
1046 self.client
1047 .locks()
1048 .members_request_deduplicated_handler
1049 .run(self.room_id().to_owned(), async move {
1050 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1051 let response = self
1052 .client
1053 .send(request.clone())
1054 .with_request_config(
1055 // In some cases it can take longer than 30s to load:
1056 // https://github.com/element-hq/synapse/issues/16872
1057 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1058 )
1059 .await?;
1060
1061 // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1062 Box::pin(self.client.base_client().receive_all_members(
1063 self.room_id(),
1064 &request,
1065 &response,
1066 ))
1067 .await?;
1068
1069 Ok(())
1070 })
1071 .await
1072 }
1073
1074 /// Request to update the encryption state for this room.
1075 ///
1076 /// It does nothing if the encryption state is already
1077 /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1078 pub async fn request_encryption_state(&self) -> Result<()> {
1079 if !self.inner.encryption_state().is_unknown() {
1080 return Ok(());
1081 }
1082
1083 self.client
1084 .locks()
1085 .encryption_state_deduplicated_handler
1086 .run(self.room_id().to_owned(), async move {
1087 // Request the event from the server.
1088 let request = get_state_event_for_key::v3::Request::new(
1089 self.room_id().to_owned(),
1090 StateEventType::RoomEncryption,
1091 "".to_owned(),
1092 );
1093 let response = match self.client.send(request).await {
1094 Ok(response) => Some(
1095 response
1096 .into_content()
1097 .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1098 )?,
1099 ),
1100 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1101 Err(err) => return Err(err.into()),
1102 };
1103
1104 // Persist the event and the fact that we requested it from the server in
1105 // `RoomInfo`.
1106 self.update_and_save_room_info(|mut room_info| {
1107 room_info.mark_encryption_state_synced();
1108 room_info.set_encryption_event(response);
1109 (room_info, RoomInfoNotableUpdateReasons::empty())
1110 })
1111 .await?;
1112
1113 Ok(())
1114 })
1115 .await
1116 }
1117
1118 /// Check the encryption state of this room.
1119 ///
1120 /// If the result is [`EncryptionState::Unknown`], one might want to call
1121 /// [`Room::request_encryption_state`].
1122 pub fn encryption_state(&self) -> EncryptionState {
1123 self.inner.encryption_state()
1124 }
1125
1126 /// Force to update the encryption state by calling
1127 /// [`Room::request_encryption_state`], and then calling
1128 /// [`Room::encryption_state`].
1129 ///
1130 /// This method is useful to ensure the encryption state is up-to-date.
1131 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1132 self.request_encryption_state().await?;
1133
1134 Ok(self.encryption_state())
1135 }
1136
1137 /// Gets additional context info about the client crypto.
1138 #[cfg(feature = "e2e-encryption")]
1139 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1140 let encryption = self.client.encryption();
1141
1142 let this_device_is_verified = match encryption.get_own_device().await {
1143 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1144
1145 // Should not happen, there will always be an own device
1146 _ => true,
1147 };
1148
1149 let backup_exists_on_server =
1150 encryption.backups().exists_on_server().await.unwrap_or(false);
1151
1152 CryptoContextInfo {
1153 device_creation_ts: encryption.device_creation_timestamp().await,
1154 this_device_is_verified,
1155 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1156 backup_exists_on_server,
1157 }
1158 }
1159
1160 fn are_events_visible(&self) -> bool {
1161 if let RoomState::Invited = self.inner.state() {
1162 return matches!(
1163 self.inner.history_visibility_or_default(),
1164 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1165 );
1166 }
1167
1168 true
1169 }
1170
1171 /// Sync the member list with the server.
1172 ///
1173 /// This method will de-duplicate requests if it is called multiple times in
1174 /// quick succession, in that case the return value will be `None`. This
1175 /// method does nothing if the members are already synced.
1176 pub async fn sync_members(&self) -> Result<()> {
1177 if !self.are_events_visible() {
1178 return Ok(());
1179 }
1180
1181 if !self.are_members_synced() {
1182 self.request_members().await?;
1183
1184 // While we're at it, calculate the active service members
1185 self.update_active_service_members().await?;
1186
1187 Ok(())
1188 } else {
1189 Ok(())
1190 }
1191 }
1192
1193 /// Get a specific member of this room.
1194 ///
1195 /// *Note*: This method will fetch the members from the homeserver if the
1196 /// member list isn't synchronized due to member lazy loading. Because of
1197 /// that it might panic if it isn't run on a tokio thread.
1198 ///
1199 /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1200 /// method that doesn't do any requests.
1201 ///
1202 /// # Arguments
1203 ///
1204 /// * `user_id` - The ID of the user that should be fetched out of the
1205 /// store.
1206 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1207 self.sync_members().await?;
1208 self.get_member_no_sync(user_id).await
1209 }
1210
1211 /// Get a specific member of this room.
1212 ///
1213 /// *Note*: This method will not fetch the members from the homeserver if
1214 /// the member list isn't synchronized due to member lazy loading. Thus,
1215 /// members could be missing.
1216 ///
1217 /// Use [get_member()](#method.get_member) if you want to ensure to always
1218 /// have the full member list to chose from.
1219 ///
1220 /// # Arguments
1221 ///
1222 /// * `user_id` - The ID of the user that should be fetched out of the
1223 /// store.
1224 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1225 Ok(self
1226 .inner
1227 .get_member(user_id)
1228 .await?
1229 .map(|member| RoomMember::new(self.client.clone(), member)))
1230 }
1231
1232 /// Get members for this room, with the given memberships.
1233 ///
1234 /// *Note*: This method will fetch the members from the homeserver if the
1235 /// member list isn't synchronized due to member lazy loading. Because of
1236 /// that it might panic if it isn't run on a tokio thread.
1237 ///
1238 /// Use [members_no_sync()](#method.members_no_sync) if you want a
1239 /// method that doesn't do any requests.
1240 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1241 self.sync_members().await?;
1242 self.members_no_sync(memberships).await
1243 }
1244
1245 /// Get members for this room, with the given memberships.
1246 ///
1247 /// *Note*: This method will not fetch the members from the homeserver if
1248 /// the member list isn't synchronized due to member lazy loading. Thus,
1249 /// members could be missing.
1250 ///
1251 /// Use [members()](#method.members) if you want to ensure to always get
1252 /// the full member list.
1253 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1254 Ok(self
1255 .inner
1256 .members(memberships)
1257 .await?
1258 .into_iter()
1259 .map(|member| RoomMember::new(self.client.clone(), member))
1260 .collect())
1261 }
1262
1263 /// Sets the display name of the current user within this room.
1264 ///
1265 /// *Note*: This is different to [`crate::Account::set_display_name`] which
1266 /// updates the user's display name across all of their rooms.
1267 pub async fn set_own_member_display_name(
1268 &self,
1269 display_name: Option<String>,
1270 ) -> Result<send_state_event::v3::Response> {
1271 let user_id = self.own_user_id();
1272 let member_event =
1273 self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1274
1275 let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1276 return Err(Error::InsufficientData);
1277 };
1278
1279 let event = raw_event.deserialize()?;
1280
1281 let mut content = match event {
1282 SyncStateEvent::Original(original_event) => original_event.content,
1283 SyncStateEvent::Redacted(redacted_event) => {
1284 RoomMemberEventContent::new(redacted_event.content.membership)
1285 }
1286 };
1287
1288 content.displayname = display_name;
1289 self.send_state_event_for_key(user_id, content).await
1290 }
1291
1292 /// Get all state events of a given type in this room.
1293 pub async fn get_state_events(
1294 &self,
1295 event_type: StateEventType,
1296 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1297 self.client
1298 .state_store()
1299 .get_state_events(self.room_id(), event_type)
1300 .await
1301 .map_err(Into::into)
1302 }
1303
1304 /// Get all state events of a given statically-known type in this room.
1305 ///
1306 /// # Examples
1307 ///
1308 /// ```no_run
1309 /// # async {
1310 /// # let room: matrix_sdk::Room = todo!();
1311 /// use matrix_sdk::ruma::{
1312 /// events::room::member::RoomMemberEventContent, serde::Raw,
1313 /// };
1314 ///
1315 /// let room_members =
1316 /// room.get_state_events_static::<RoomMemberEventContent>().await?;
1317 /// # anyhow::Ok(())
1318 /// # };
1319 /// ```
1320 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1321 where
1322 C: StaticEventContent<IsPrefix = ruma::events::False>
1323 + StaticStateEventContent
1324 + RedactContent,
1325 C::Redacted: RedactedStateEventContent,
1326 {
1327 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1328 }
1329
1330 /// Get the state events of a given type with the given state keys in this
1331 /// room.
1332 pub async fn get_state_events_for_keys(
1333 &self,
1334 event_type: StateEventType,
1335 state_keys: &[&str],
1336 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1337 self.client
1338 .state_store()
1339 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1340 .await
1341 .map_err(Into::into)
1342 }
1343
1344 /// Get the state events of a given statically-known type with the given
1345 /// state keys in this room.
1346 ///
1347 /// # Examples
1348 ///
1349 /// ```no_run
1350 /// # async {
1351 /// # let room: matrix_sdk::Room = todo!();
1352 /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1353 /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1354 ///
1355 /// let room_members = room
1356 /// .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1357 /// user_ids,
1358 /// )
1359 /// .await?;
1360 /// # anyhow::Ok(())
1361 /// # };
1362 /// ```
1363 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1364 &self,
1365 state_keys: I,
1366 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1367 where
1368 C: StaticEventContent<IsPrefix = ruma::events::False>
1369 + StaticStateEventContent
1370 + RedactContent,
1371 C::StateKey: Borrow<K>,
1372 C::Redacted: RedactedStateEventContent,
1373 K: AsRef<str> + Sized + Sync + 'a,
1374 I: IntoIterator<Item = &'a K> + Send,
1375 I::IntoIter: Send,
1376 {
1377 Ok(self
1378 .client
1379 .state_store()
1380 .get_state_events_for_keys_static(self.room_id(), state_keys)
1381 .await?)
1382 }
1383
1384 /// Get a specific state event in this room.
1385 pub async fn get_state_event(
1386 &self,
1387 event_type: StateEventType,
1388 state_key: &str,
1389 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1390 self.client
1391 .state_store()
1392 .get_state_event(self.room_id(), event_type, state_key)
1393 .await
1394 .map_err(Into::into)
1395 }
1396
1397 /// Get a specific state event of statically-known type with an empty state
1398 /// key in this room.
1399 ///
1400 /// # Examples
1401 ///
1402 /// ```no_run
1403 /// # async {
1404 /// # let room: matrix_sdk::Room = todo!();
1405 /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1406 ///
1407 /// let power_levels = room
1408 /// .get_state_event_static::<RoomPowerLevelsEventContent>()
1409 /// .await?
1410 /// .expect("every room has a power_levels event")
1411 /// .deserialize()?;
1412 /// # anyhow::Ok(())
1413 /// # };
1414 /// ```
1415 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1416 where
1417 C: StaticEventContent<IsPrefix = ruma::events::False>
1418 + StaticStateEventContent<StateKey = EmptyStateKey>
1419 + RedactContent,
1420 C::Redacted: RedactedStateEventContent,
1421 {
1422 self.get_state_event_static_for_key(&EmptyStateKey).await
1423 }
1424
1425 /// Get a specific state event of statically-known type in this room.
1426 ///
1427 /// # Examples
1428 ///
1429 /// ```no_run
1430 /// # async {
1431 /// # let room: matrix_sdk::Room = todo!();
1432 /// use matrix_sdk::ruma::{
1433 /// events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1434 /// };
1435 ///
1436 /// let member_event = room
1437 /// .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1438 /// "@alice:example.org"
1439 /// ))
1440 /// .await?;
1441 /// # anyhow::Ok(())
1442 /// # };
1443 /// ```
1444 pub async fn get_state_event_static_for_key<C, K>(
1445 &self,
1446 state_key: &K,
1447 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1448 where
1449 C: StaticEventContent<IsPrefix = ruma::events::False>
1450 + StaticStateEventContent
1451 + RedactContent,
1452 C::StateKey: Borrow<K>,
1453 C::Redacted: RedactedStateEventContent,
1454 K: AsRef<str> + ?Sized + Sync,
1455 {
1456 Ok(self
1457 .client
1458 .state_store()
1459 .get_state_event_static_for_key(self.room_id(), state_key)
1460 .await?)
1461 }
1462
1463 /// Returns the parents this room advertises as its parents.
1464 ///
1465 /// Results are in no particular order.
1466 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1467 // Implements this algorithm:
1468 // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1469
1470 // Get all m.space.parent events for this room
1471 Ok(self
1472 .get_state_events_static::<SpaceParentEventContent>()
1473 .await?
1474 .into_iter()
1475 // Extract state key (ie. the parent's id) and sender
1476 .filter_map(|parent_event| match parent_event.deserialize() {
1477 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1478 Some((e.state_key.to_owned(), e.sender))
1479 }
1480 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1481 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1482 Err(e) => {
1483 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1484 None
1485 }
1486 })
1487 // Check whether the parent recognizes this room as its child
1488 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1489 let Some(parent_room) = self.client.get_room(&state_key) else {
1490 // We are not in the room, cannot check if the relationship is reciprocal
1491 // TODO: try peeking into the room
1492 return Ok(ParentSpace::Unverifiable(state_key));
1493 };
1494 // Get the m.space.child state of the parent with this room's id
1495 // as state key.
1496 if let Some(child_event) = parent_room
1497 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1498 .await?
1499 {
1500 match child_event.deserialize() {
1501 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1502 // There is a valid m.space.child in the parent pointing to
1503 // this room
1504 return Ok(ParentSpace::Reciprocal(parent_room));
1505 }
1506 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1507 Ok(SyncOrStrippedState::Stripped(_)) => {}
1508 Err(e) => {
1509 info!(
1510 room_id = ?self.room_id(), parent_room_id = ?state_key,
1511 "Could not deserialize m.space.child: {e}"
1512 );
1513 }
1514 }
1515 // Otherwise the event is either invalid or redacted. If
1516 // redacted it would be missing the
1517 // `via` key, thereby invalidating that end of the
1518 // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1519 }
1520
1521 // No reciprocal m.space.child found, let's check if the sender has the
1522 // power to set it
1523 let Some(member) = parent_room.get_member(&sender).await? else {
1524 // Sender is not even in the parent room
1525 return Ok(ParentSpace::Illegitimate(parent_room));
1526 };
1527
1528 if member.can_send_state(StateEventType::SpaceChild) {
1529 // Sender does have the power to set m.room.child
1530 Ok(ParentSpace::WithPowerlevel(parent_room))
1531 } else {
1532 Ok(ParentSpace::Illegitimate(parent_room))
1533 }
1534 })
1535 .collect::<FuturesUnordered<_>>())
1536 }
1537
1538 /// Read account data in this room, from storage.
1539 pub async fn account_data(
1540 &self,
1541 data_type: RoomAccountDataEventType,
1542 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1543 self.client
1544 .state_store()
1545 .get_room_account_data_event(self.room_id(), data_type)
1546 .await
1547 .map_err(Into::into)
1548 }
1549
1550 /// Get account data of a statically-known type in this room, from storage.
1551 ///
1552 /// # Examples
1553 ///
1554 /// ```no_run
1555 /// # async {
1556 /// # let room: matrix_sdk::Room = todo!();
1557 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1558 ///
1559 /// match room.account_data_static::<FullyReadEventContent>().await? {
1560 /// Some(fully_read) => {
1561 /// println!("Found read marker: {:?}", fully_read.deserialize()?)
1562 /// }
1563 /// None => println!("No read marker for this room"),
1564 /// }
1565 /// # anyhow::Ok(())
1566 /// # };
1567 /// ```
1568 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1569 where
1570 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1571 {
1572 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1573 }
1574
1575 /// Check if all members of this room are verified and all their devices are
1576 /// verified.
1577 ///
1578 /// Returns true if all devices in the room are verified, otherwise false.
1579 #[cfg(feature = "e2e-encryption")]
1580 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1581 let user_ids = self
1582 .client
1583 .state_store()
1584 .get_user_ids(self.room_id(), RoomMemberships::empty())
1585 .await?;
1586
1587 for user_id in user_ids {
1588 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1589 let any_unverified = devices.devices().any(|d| !d.is_verified());
1590
1591 if any_unverified {
1592 return Ok(false);
1593 }
1594 }
1595
1596 Ok(true)
1597 }
1598
1599 /// Set the given account data event for this room.
1600 ///
1601 /// # Example
1602 /// ```
1603 /// # async {
1604 /// # let room: matrix_sdk::Room = todo!();
1605 /// # let event_id: ruma::OwnedEventId = todo!();
1606 /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1607 /// let content = FullyReadEventContent::new(event_id);
1608 ///
1609 /// room.set_account_data(content).await?;
1610 /// # anyhow::Ok(())
1611 /// # };
1612 /// ```
1613 pub async fn set_account_data<T>(
1614 &self,
1615 content: T,
1616 ) -> Result<set_room_account_data::v3::Response>
1617 where
1618 T: RoomAccountDataEventContent,
1619 {
1620 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1621
1622 let request = set_room_account_data::v3::Request::new(
1623 own_user.to_owned(),
1624 self.room_id().to_owned(),
1625 &content,
1626 )?;
1627
1628 Ok(self.client.send(request).await?)
1629 }
1630
1631 /// Set the given raw account data event in this room.
1632 ///
1633 /// # Example
1634 /// ```
1635 /// # async {
1636 /// # let room: matrix_sdk::Room = todo!();
1637 /// use matrix_sdk::ruma::{
1638 /// events::{
1639 /// AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1640 /// marked_unread::MarkedUnreadEventContent,
1641 /// },
1642 /// serde::Raw,
1643 /// };
1644 /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1645 /// let full_event: AnyRoomAccountDataEventContent =
1646 /// marked_unread_content.clone().into();
1647 /// room.set_account_data_raw(
1648 /// marked_unread_content.event_type(),
1649 /// Raw::new(&full_event).unwrap(),
1650 /// )
1651 /// .await?;
1652 /// # anyhow::Ok(())
1653 /// # };
1654 /// ```
1655 pub async fn set_account_data_raw(
1656 &self,
1657 event_type: RoomAccountDataEventType,
1658 content: Raw<AnyRoomAccountDataEventContent>,
1659 ) -> Result<set_room_account_data::v3::Response> {
1660 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1661
1662 let request = set_room_account_data::v3::Request::new_raw(
1663 own_user.to_owned(),
1664 self.room_id().to_owned(),
1665 event_type,
1666 content,
1667 );
1668
1669 Ok(self.client.send(request).await?)
1670 }
1671
1672 /// Adds a tag to the room, or updates it if it already exists.
1673 ///
1674 /// Returns the [`create_tag::v3::Response`] from the server.
1675 ///
1676 /// # Arguments
1677 /// * `tag` - The tag to add or update.
1678 ///
1679 /// * `tag_info` - Information about the tag, generally containing the
1680 /// `order` parameter.
1681 ///
1682 /// # Examples
1683 ///
1684 /// ```no_run
1685 /// # use std::str::FromStr;
1686 /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1687 /// # async {
1688 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1689 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1690 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1691 /// use matrix_sdk::ruma::events::tag::TagInfo;
1692 ///
1693 /// if let Some(room) = client.get_room(&room_id) {
1694 /// let mut tag_info = TagInfo::new();
1695 /// tag_info.order = Some(0.9);
1696 /// let user_tag = UserTagName::from_str("u.work")?;
1697 ///
1698 /// room.set_tag(TagName::User(user_tag), tag_info).await?;
1699 /// }
1700 /// # anyhow::Ok(()) };
1701 /// ```
1702 pub async fn set_tag(
1703 &self,
1704 tag: TagName,
1705 tag_info: TagInfo,
1706 ) -> Result<create_tag::v3::Response> {
1707 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1708 let request = create_tag::v3::Request::new(
1709 user_id.to_owned(),
1710 self.inner.room_id().to_owned(),
1711 tag.to_string(),
1712 tag_info,
1713 );
1714 Ok(self.client.send(request).await?)
1715 }
1716
1717 /// Removes a tag from the room.
1718 ///
1719 /// Returns the [`delete_tag::v3::Response`] from the server.
1720 ///
1721 /// # Arguments
1722 /// * `tag` - The tag to remove.
1723 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1724 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1725 let request = delete_tag::v3::Request::new(
1726 user_id.to_owned(),
1727 self.inner.room_id().to_owned(),
1728 tag.to_string(),
1729 );
1730 Ok(self.client.send(request).await?)
1731 }
1732
1733 /// Add or remove the `m.favourite` flag for this room.
1734 ///
1735 /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1736 /// room, the tag will be removed too.
1737 ///
1738 /// # Arguments
1739 ///
1740 /// * `is_favourite` - Whether to mark this room as favourite.
1741 /// * `tag_order` - The order of the tag if any.
1742 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1743 if is_favourite {
1744 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1745
1746 self.set_tag(TagName::Favorite, tag_info).await?;
1747
1748 if self.is_low_priority() {
1749 self.remove_tag(TagName::LowPriority).await?;
1750 }
1751 } else {
1752 self.remove_tag(TagName::Favorite).await?;
1753 }
1754 Ok(())
1755 }
1756
1757 /// Add or remove the `m.lowpriority` flag for this room.
1758 ///
1759 /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1760 /// room, the tag will be removed too.
1761 ///
1762 /// # Arguments
1763 ///
1764 /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1765 /// * `tag_order` - The order of the tag if any.
1766 pub async fn set_is_low_priority(
1767 &self,
1768 is_low_priority: bool,
1769 tag_order: Option<f64>,
1770 ) -> Result<()> {
1771 if is_low_priority {
1772 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1773
1774 self.set_tag(TagName::LowPriority, tag_info).await?;
1775
1776 if self.is_favourite() {
1777 self.remove_tag(TagName::Favorite).await?;
1778 }
1779 } else {
1780 self.remove_tag(TagName::LowPriority).await?;
1781 }
1782 Ok(())
1783 }
1784
1785 /// Sets whether this room is a DM.
1786 ///
1787 /// When setting this room as DM, it will be marked as DM for all active
1788 /// members of the room. When unsetting this room as DM, it will be
1789 /// unmarked as DM for all users, not just the members.
1790 ///
1791 /// # Arguments
1792 /// * `is_direct` - Whether to mark this room as direct.
1793 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1794 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1795
1796 let mut content = self
1797 .client
1798 .account()
1799 .account_data::<DirectEventContent>()
1800 .await?
1801 .map(|c| c.deserialize())
1802 .transpose()?
1803 .unwrap_or_default();
1804
1805 let this_room_id = self.inner.room_id();
1806
1807 if is_direct {
1808 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1809 room_members.retain(|member| member.user_id() != self.own_user_id());
1810
1811 for member in room_members {
1812 let entry = content.entry(member.user_id().into()).or_default();
1813 if !entry.iter().any(|room_id| room_id == this_room_id) {
1814 entry.push(this_room_id.to_owned());
1815 }
1816 }
1817 } else {
1818 for (_, list) in content.iter_mut() {
1819 list.retain(|room_id| *room_id != this_room_id);
1820 }
1821
1822 // Remove user ids that don't have any room marked as DM
1823 content.retain(|_, list| !list.is_empty());
1824 }
1825
1826 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1827
1828 self.client.send(request).await?;
1829 Ok(())
1830 }
1831
1832 /// Tries to decrypt a room event.
1833 ///
1834 /// # Arguments
1835 /// * `event` - The room event to be decrypted.
1836 ///
1837 /// Returns the decrypted event. In the case of a decryption error, returns
1838 /// a `TimelineEvent` representing the decryption error.
1839 #[cfg(feature = "e2e-encryption")]
1840 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1841 pub async fn decrypt_event(
1842 &self,
1843 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1844 push_ctx: Option<&PushContext>,
1845 ) -> Result<TimelineEvent> {
1846 let machine = self.client.olm_machine().await;
1847 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1848
1849 match machine
1850 .try_decrypt_room_event(
1851 event.cast_ref(),
1852 self.inner.room_id(),
1853 self.client.decryption_settings(),
1854 )
1855 .await?
1856 {
1857 RoomEventDecryptionResult::Decrypted(decrypted) => {
1858 let push_actions = if let Some(push_ctx) = push_ctx {
1859 Some(push_ctx.for_event(&decrypted.event).await)
1860 } else {
1861 None
1862 };
1863 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1864 }
1865 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1866 self.client
1867 .encryption()
1868 .backups()
1869 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1870 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1871 }
1872 }
1873 }
1874
1875 /// Tries to decrypt a room event.
1876 ///
1877 /// # Arguments
1878 /// * `event` - The room event to be decrypted.
1879 ///
1880 /// Returns the decrypted event. In the case of a decryption error, returns
1881 /// a `TimelineEvent` representing the decryption error.
1882 #[cfg(feature = "experimental-encrypted-state-events")]
1883 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1884 &self,
1885 event: &Raw<T>,
1886 push_ctx: Option<&PushContext>,
1887 ) -> Result<TimelineEvent> {
1888 let machine = self.client.olm_machine().await;
1889 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1890
1891 match machine
1892 .try_decrypt_room_event(
1893 event.cast_ref(),
1894 self.inner.room_id(),
1895 self.client.decryption_settings(),
1896 )
1897 .await?
1898 {
1899 RoomEventDecryptionResult::Decrypted(decrypted) => {
1900 let push_actions = if let Some(push_ctx) = push_ctx {
1901 Some(push_ctx.for_event(&decrypted.event).await)
1902 } else {
1903 None
1904 };
1905 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1906 }
1907 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1908 self.client
1909 .encryption()
1910 .backups()
1911 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1912 // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1913 // event.
1914 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1915 }
1916 }
1917 }
1918
1919 /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1920 /// session_id.
1921 ///
1922 /// This may be used when we receive an update for a session, and we want to
1923 /// reflect the changes in messages we have received that were encrypted
1924 /// with that session, e.g. to remove a warning shield because a device is
1925 /// now verified.
1926 ///
1927 /// # Arguments
1928 /// * `session_id` - The ID of the Megolm session to get information for.
1929 /// * `sender` - The (claimed) sender of the event where the session was
1930 /// used.
1931 #[cfg(feature = "e2e-encryption")]
1932 pub async fn get_encryption_info(
1933 &self,
1934 session_id: &str,
1935 sender: &UserId,
1936 ) -> Option<Arc<EncryptionInfo>> {
1937 let machine = self.client.olm_machine().await;
1938 let machine = machine.as_ref()?;
1939 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1940 }
1941
1942 /// Forces the currently active room key, which is used to encrypt messages,
1943 /// to be rotated.
1944 ///
1945 /// A new room key will be crated and shared with all the room members the
1946 /// next time a message will be sent. You don't have to call this method,
1947 /// room keys will be rotated automatically when necessary. This method is
1948 /// still useful for debugging purposes.
1949 ///
1950 /// For more info please take a look a the [`encryption`] module
1951 /// documentation.
1952 ///
1953 /// [`encryption`]: crate::encryption
1954 #[cfg(feature = "e2e-encryption")]
1955 pub async fn discard_room_key(&self) -> Result<()> {
1956 let machine = self.client.olm_machine().await;
1957 if let Some(machine) = machine.as_ref() {
1958 machine.discard_room_key(self.inner.room_id()).await?;
1959 Ok(())
1960 } else {
1961 Err(Error::NoOlmMachine)
1962 }
1963 }
1964
1965 /// Ban the user with `UserId` from this room.
1966 ///
1967 /// # Arguments
1968 ///
1969 /// * `user_id` - The user to ban with `UserId`.
1970 ///
1971 /// * `reason` - The reason for banning this user.
1972 #[instrument(skip_all)]
1973 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1974 let request = assign!(
1975 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1976 { reason: reason.map(ToOwned::to_owned) }
1977 );
1978 self.client.send(request).await?;
1979 Ok(())
1980 }
1981
1982 /// Unban the user with `UserId` from this room.
1983 ///
1984 /// # Arguments
1985 ///
1986 /// * `user_id` - The user to unban with `UserId`.
1987 ///
1988 /// * `reason` - The reason for unbanning this user.
1989 #[instrument(skip_all)]
1990 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1991 let request = assign!(
1992 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1993 { reason: reason.map(ToOwned::to_owned) }
1994 );
1995 self.client.send(request).await?;
1996 Ok(())
1997 }
1998
1999 /// Kick a user out of this room.
2000 ///
2001 /// # Arguments
2002 ///
2003 /// * `user_id` - The `UserId` of the user that should be kicked out of the
2004 /// room.
2005 ///
2006 /// * `reason` - Optional reason why the room member is being kicked out.
2007 #[instrument(skip_all)]
2008 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
2009 let request = assign!(
2010 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
2011 { reason: reason.map(ToOwned::to_owned) }
2012 );
2013 self.client.send(request).await?;
2014 Ok(())
2015 }
2016
2017 /// Invite the specified user by `UserId` to this room.
2018 ///
2019 /// # Arguments
2020 ///
2021 /// * `user_id` - The `UserId` of the user to invite to the room.
2022 #[instrument(skip_all)]
2023 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2024 #[cfg(feature = "e2e-encryption")]
2025 if self.client.inner.enable_share_history_on_invite {
2026 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2027 }
2028
2029 let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2030 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2031 self.client.send(request).await?;
2032
2033 // Force a future room members reload before sending any event to prevent UTDs
2034 // that can happen when some event is sent after a room member has been invited
2035 // but before the /sync request could fetch the membership change event.
2036 self.mark_members_missing();
2037
2038 Ok(())
2039 }
2040
2041 /// Invite the specified user by third party id to this room.
2042 ///
2043 /// # Arguments
2044 ///
2045 /// * `invite_id` - A third party id of a user to invite to the room.
2046 #[instrument(skip_all)]
2047 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2048 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2049 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2050 self.client.send(request).await?;
2051
2052 // Force a future room members reload before sending any event to prevent UTDs
2053 // that can happen when some event is sent after a room member has been invited
2054 // but before the /sync request could fetch the membership change event.
2055 self.mark_members_missing();
2056
2057 Ok(())
2058 }
2059
2060 /// Activate typing notice for this room.
2061 ///
2062 /// The typing notice remains active for 4s. It can be deactivate at any
2063 /// point by setting typing to `false`. If this method is called while
2064 /// the typing notice is active nothing will happen. This method can be
2065 /// called on every key stroke, since it will do nothing while typing is
2066 /// active.
2067 ///
2068 /// # Arguments
2069 ///
2070 /// * `typing` - Whether the user is typing or has stopped typing.
2071 ///
2072 /// # Examples
2073 ///
2074 /// ```no_run
2075 /// use std::time::Duration;
2076 ///
2077 /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2078 /// # use matrix_sdk::{
2079 /// # Client, config::SyncSettings,
2080 /// # ruma::room_id,
2081 /// # };
2082 /// # use url::Url;
2083 ///
2084 /// # async {
2085 /// # let homeserver = Url::parse("http://localhost:8080")?;
2086 /// # let client = Client::new(homeserver).await?;
2087 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2088 ///
2089 /// if let Some(room) = client.get_room(&room_id) {
2090 /// room.typing_notice(true).await?
2091 /// }
2092 /// # anyhow::Ok(()) };
2093 /// ```
2094 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2095 self.ensure_room_joined()?;
2096
2097 // Only send a request to the homeserver if the old timeout has elapsed
2098 // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2099 let send = if let Some(typing_time) =
2100 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2101 {
2102 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2103 // We always reactivate the typing notice if typing is true or
2104 // we may need to deactivate it if it's
2105 // currently active if typing is false
2106 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2107 } else {
2108 // Only send a request when we need to deactivate typing
2109 !typing
2110 }
2111 } else {
2112 // Typing notice is currently deactivated, therefore, send a request
2113 // only when it's about to be activated
2114 typing
2115 };
2116
2117 if send {
2118 self.send_typing_notice(typing).await?;
2119 }
2120
2121 Ok(())
2122 }
2123
2124 #[instrument(name = "typing_notice", skip(self))]
2125 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2126 let typing = if typing {
2127 self.client
2128 .inner
2129 .typing_notice_times
2130 .write()
2131 .unwrap()
2132 .insert(self.room_id().to_owned(), Instant::now());
2133 Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2134 } else {
2135 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2136 Typing::No
2137 };
2138
2139 let request = create_typing_event::v3::Request::new(
2140 self.own_user_id().to_owned(),
2141 self.room_id().to_owned(),
2142 typing,
2143 );
2144
2145 self.client.send(request).await?;
2146
2147 Ok(())
2148 }
2149
2150 /// Send a request to set a single receipt.
2151 ///
2152 /// If an unthreaded receipt is sent, this will also unset the unread flag
2153 /// of the room if necessary.
2154 ///
2155 /// # Arguments
2156 ///
2157 /// * `receipt_type` - The type of the receipt to set. Note that it is
2158 /// possible to set the fully-read marker although it is technically not a
2159 /// receipt.
2160 ///
2161 /// * `thread` - The thread where this receipt should apply, if any. Note
2162 /// that this must be [`ReceiptThread::Unthreaded`] when sending a
2163 /// [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2164 ///
2165 /// * `event_id` - The `EventId` of the event to set the receipt on.
2166 #[instrument(skip_all)]
2167 pub async fn send_single_receipt(
2168 &self,
2169 receipt_type: create_receipt::v3::ReceiptType,
2170 thread: ReceiptThread,
2171 event_id: OwnedEventId,
2172 ) -> Result<()> {
2173 // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2174 // string key.
2175 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2176
2177 self.client
2178 .inner
2179 .locks
2180 .read_receipt_deduplicated_handler
2181 .run((request_key, event_id.clone()), async {
2182 // We will unset the unread flag if we send an unthreaded receipt.
2183 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2184
2185 let mut request = create_receipt::v3::Request::new(
2186 self.room_id().to_owned(),
2187 receipt_type,
2188 event_id,
2189 );
2190 request.thread = thread;
2191
2192 self.client.send(request).await?;
2193
2194 if is_unthreaded {
2195 self.set_unread_flag(false).await?;
2196 }
2197
2198 Ok(())
2199 })
2200 .await
2201 }
2202
2203 /// Send a request to set multiple receipts at once.
2204 ///
2205 /// This will also unset the unread flag of the room if necessary.
2206 ///
2207 /// # Arguments
2208 ///
2209 /// * `receipts` - The `Receipts` to send.
2210 ///
2211 /// If `receipts` is empty, this is a no-op.
2212 #[instrument(skip_all)]
2213 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2214 if receipts.is_empty() {
2215 return Ok(());
2216 }
2217
2218 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2219 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2220 fully_read,
2221 read_receipt: public_read_receipt,
2222 private_read_receipt,
2223 });
2224
2225 self.client.send(request).await?;
2226
2227 self.set_unread_flag(false).await?;
2228
2229 Ok(())
2230 }
2231
2232 /// Helper function to enable End-to-end encryption in this room.
2233 /// `encrypted_state_events` is not used unless the
2234 /// `experimental-encrypted-state-events` feature is enabled.
2235 #[allow(unused_variables, unused_mut)]
2236 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2237 use ruma::{
2238 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2239 };
2240 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2241
2242 if !self.latest_encryption_state().await?.is_encrypted() {
2243 let mut content =
2244 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2245 #[cfg(feature = "experimental-encrypted-state-events")]
2246 if encrypted_state_events {
2247 content = content.with_encrypted_state();
2248 }
2249 self.send_state_event(content).await?;
2250
2251 // Spin on the sync beat event, since the first sync we receive might not
2252 // include the encryption event.
2253 //
2254 // TODO do we want to return an error here if we time out? This
2255 // could be quite useful if someone wants to enable encryption and
2256 // send a message right after it's enabled.
2257 let res = timeout(
2258 async {
2259 loop {
2260 // Listen for sync events, then check if the encryption state is known.
2261 self.client.inner.sync_beat.listen().await;
2262 let _state_store_lock =
2263 self.client.base_client().state_store_lock().lock().await;
2264
2265 if !self.inner.encryption_state().is_unknown() {
2266 break;
2267 }
2268 }
2269 },
2270 SYNC_WAIT_TIME,
2271 )
2272 .await;
2273
2274 let store_guard = self.client.base_client().state_store_lock().lock().await;
2275
2276 // If encryption was enabled, return.
2277 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2278 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2279 debug!("room successfully marked as encrypted");
2280 return Ok(());
2281 }
2282
2283 // If encryption with state event encryption was enabled, return.
2284 #[cfg(feature = "experimental-encrypted-state-events")]
2285 if res.is_ok() && {
2286 if encrypted_state_events {
2287 self.inner.encryption_state().is_state_encrypted()
2288 } else {
2289 self.inner.encryption_state().is_encrypted()
2290 }
2291 } {
2292 debug!("room successfully marked as encrypted");
2293 return Ok(());
2294 }
2295
2296 // If after waiting for multiple syncs, we don't have the encryption state we
2297 // expect, assume the local encryption state is incorrect; this will
2298 // cause the SDK to re-request it later for confirmation, instead of
2299 // assuming it's sync'd and correct (and not encrypted).
2300 debug!("still not marked as encrypted, marking encryption state as missing");
2301
2302 self.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
2303 info.mark_encryption_state_missing();
2304 (info, RoomInfoNotableUpdateReasons::empty())
2305 })
2306 .await?;
2307 }
2308
2309 Ok(())
2310 }
2311
2312 /// Enable End-to-end encryption in this room.
2313 ///
2314 /// This method will be a noop if encryption is already enabled, otherwise
2315 /// sends a `m.room.encryption` state event to the room. This might fail if
2316 /// you don't have the appropriate power level to enable end-to-end
2317 /// encryption.
2318 ///
2319 /// A sync needs to be received to update the local room state. This method
2320 /// will wait for a sync to be received, this might time out if no
2321 /// sync loop is running or if the server is slow.
2322 ///
2323 /// # Examples
2324 ///
2325 /// ```no_run
2326 /// # use matrix_sdk::{
2327 /// # Client, config::SyncSettings,
2328 /// # ruma::room_id,
2329 /// # };
2330 /// # use url::Url;
2331 /// #
2332 /// # async {
2333 /// # let homeserver = Url::parse("http://localhost:8080")?;
2334 /// # let client = Client::new(homeserver).await?;
2335 /// # let room_id = room_id!("!test:localhost");
2336 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2337 ///
2338 /// if let Some(room) = client.get_room(&room_id) {
2339 /// room.enable_encryption().await?
2340 /// }
2341 /// # anyhow::Ok(()) };
2342 /// ```
2343 #[instrument(skip_all)]
2344 pub async fn enable_encryption(&self) -> Result<()> {
2345 self.enable_encryption_inner(false).await
2346 }
2347
2348 /// Enable End-to-end encryption in this room, opting into experimental
2349 /// state event encryption.
2350 ///
2351 /// This method will be a noop if encryption is already enabled, otherwise
2352 /// sends a `m.room.encryption` state event to the room. This might fail if
2353 /// you don't have the appropriate power level to enable end-to-end
2354 /// encryption.
2355 ///
2356 /// A sync needs to be received to update the local room state. This method
2357 /// will wait for a sync to be received, this might time out if no
2358 /// sync loop is running or if the server is slow.
2359 ///
2360 /// # Examples
2361 ///
2362 /// ```no_run
2363 /// # use matrix_sdk::{
2364 /// # Client, config::SyncSettings,
2365 /// # ruma::room_id,
2366 /// # };
2367 /// # use url::Url;
2368 /// #
2369 /// # async {
2370 /// # let homeserver = Url::parse("http://localhost:8080")?;
2371 /// # let client = Client::new(homeserver).await?;
2372 /// # let room_id = room_id!("!test:localhost");
2373 /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2374 ///
2375 /// if let Some(room) = client.get_room(&room_id) {
2376 /// room.enable_encryption_with_state_event_encryption().await?
2377 /// }
2378 /// # anyhow::Ok(()) };
2379 /// ```
2380 #[instrument(skip_all)]
2381 #[cfg(feature = "experimental-encrypted-state-events")]
2382 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2383 self.enable_encryption_inner(true).await
2384 }
2385
2386 /// Share a room key with users in the given room.
2387 ///
2388 /// This will create Olm sessions with all the users/device pairs in the
2389 /// room if necessary and share a room key that can be shared with them.
2390 ///
2391 /// Does nothing if no room key needs to be shared.
2392 // TODO: expose this publicly so people can pre-share a group session if
2393 // e.g. a user starts to type a message for a room.
2394 #[cfg(feature = "e2e-encryption")]
2395 #[instrument(skip_all, fields(room_id = ?self.room_id()))]
2396 async fn preshare_room_key(&self) -> Result<()> {
2397 self.ensure_room_joined()?;
2398
2399 // Take and release the lock on the store, if needs be.
2400 let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2401
2402 self.client
2403 .locks()
2404 .group_session_deduplicated_handler
2405 .run(self.room_id().to_owned(), async move {
2406 {
2407 let members = self
2408 .client
2409 .state_store()
2410 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2411 .await?;
2412 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2413 };
2414
2415 let response = self.share_room_key().await;
2416
2417 // If one of the responses failed invalidate the group
2418 // session as using it would end up in undecryptable
2419 // messages.
2420 if let Err(r) = response {
2421 let machine = self.client.olm_machine().await;
2422 if let Some(machine) = machine.as_ref() {
2423 machine.discard_room_key(self.room_id()).await?;
2424 }
2425 return Err(r);
2426 }
2427
2428 Ok(())
2429 })
2430 .await
2431 }
2432
2433 /// Share a group session for a room.
2434 ///
2435 /// # Panics
2436 ///
2437 /// Panics if the client isn't logged in.
2438 #[cfg(feature = "e2e-encryption")]
2439 #[instrument(skip_all)]
2440 async fn share_room_key(&self) -> Result<()> {
2441 self.ensure_room_joined()?;
2442
2443 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2444
2445 for request in requests {
2446 let response = self.client.send_to_device(&request).await?;
2447 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2448 }
2449
2450 Ok(())
2451 }
2452
2453 /// Wait for the room to be fully synced.
2454 ///
2455 /// This method makes sure the room that was returned when joining a room
2456 /// has been echoed back in the sync.
2457 ///
2458 /// Warning: This waits until a sync happens and does not return if no sync
2459 /// is happening. It can also return early when the room is not a joined
2460 /// room anymore.
2461 #[instrument(skip_all)]
2462 pub async fn sync_up(&self) {
2463 while !self.is_synced() && self.state() == RoomState::Joined {
2464 let wait_for_beat = self.client.inner.sync_beat.listen();
2465 // We don't care whether it's a timeout or a sync beat.
2466 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2467 }
2468 }
2469
2470 /// Send a message-like event to this room.
2471 ///
2472 /// Returns the parsed response from the server.
2473 ///
2474 /// If the encryption feature is enabled this method will transparently
2475 /// encrypt the event if this room is encrypted (except for `m.reaction`
2476 /// events, which are never encrypted).
2477 ///
2478 /// **Note**: If you just want to send an event with custom JSON content to
2479 /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2480 ///
2481 /// If you want to set a transaction ID for the event, use
2482 /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2483 /// on the returned value before `.await`ing it.
2484 ///
2485 /// # Arguments
2486 ///
2487 /// * `content` - The content of the message event.
2488 ///
2489 /// # Examples
2490 ///
2491 /// ```no_run
2492 /// # use std::sync::{Arc, RwLock};
2493 /// # use matrix_sdk::{Client, config::SyncSettings};
2494 /// # use url::Url;
2495 /// # use matrix_sdk::ruma::room_id;
2496 /// # use serde::{Deserialize, Serialize};
2497 /// use matrix_sdk::ruma::{
2498 /// MilliSecondsSinceUnixEpoch, TransactionId,
2499 /// events::{
2500 /// macros::EventContent,
2501 /// room::message::{RoomMessageEventContent, TextMessageEventContent},
2502 /// },
2503 /// uint,
2504 /// };
2505 ///
2506 /// # async {
2507 /// # let homeserver = Url::parse("http://localhost:8080")?;
2508 /// # let mut client = Client::new(homeserver).await?;
2509 /// # let room_id = room_id!("!test:localhost");
2510 /// let content = RoomMessageEventContent::text_plain("Hello world");
2511 /// let txn_id = TransactionId::new();
2512 ///
2513 /// if let Some(room) = client.get_room(&room_id) {
2514 /// room.send(content).with_transaction_id(txn_id).await?;
2515 /// }
2516 ///
2517 /// // Custom events work too:
2518 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2519 /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2520 /// struct TokenEventContent {
2521 /// token: String,
2522 /// #[serde(rename = "exp")]
2523 /// expires_at: MilliSecondsSinceUnixEpoch,
2524 /// }
2525 ///
2526 /// # fn generate_token() -> String { todo!() }
2527 /// let content = TokenEventContent {
2528 /// token: generate_token(),
2529 /// expires_at: {
2530 /// let now = MilliSecondsSinceUnixEpoch::now();
2531 /// MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2532 /// },
2533 /// };
2534 ///
2535 /// if let Some(room) = client.get_room(&room_id) {
2536 /// room.send(content).await?;
2537 /// }
2538 /// # anyhow::Ok(()) };
2539 /// ```
2540 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2541 SendMessageLikeEvent::new(self, content)
2542 }
2543
2544 /// Run /keys/query requests for all the non-tracked users, and for users
2545 /// with an out-of-date device list.
2546 #[cfg(feature = "e2e-encryption")]
2547 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2548 let olm = self.client.olm_machine().await;
2549 let olm = olm.as_ref().expect("Olm machine wasn't started");
2550
2551 let members =
2552 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2553
2554 let tracked: HashMap<_, _> = olm
2555 .store()
2556 .load_tracked_users()
2557 .await?
2558 .into_iter()
2559 .map(|tracked| (tracked.user_id, tracked.dirty))
2560 .collect();
2561
2562 // A member has no unknown devices iff it was tracked *and* the tracking is
2563 // not considered dirty.
2564 let members_with_unknown_devices =
2565 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2566
2567 let (req_id, request) =
2568 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2569
2570 if !request.device_keys.is_empty() {
2571 self.client.keys_query(&req_id, request.device_keys).await?;
2572 }
2573
2574 Ok(())
2575 }
2576
2577 /// Send a message-like event with custom JSON content to this room.
2578 ///
2579 /// Returns the parsed response from the server.
2580 ///
2581 /// If the encryption feature is enabled this method will transparently
2582 /// encrypt the event if this room is encrypted (except for `m.reaction`
2583 /// events, which are never encrypted).
2584 ///
2585 /// This method is equivalent to the [`send()`][Self::send] method but
2586 /// allows sending custom JSON payloads, e.g. constructed using the
2587 /// [`serde_json::json!()`] macro.
2588 ///
2589 /// If you want to set a transaction ID for the event, use
2590 /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2591 /// on the returned value before `.await`ing it.
2592 ///
2593 /// # Arguments
2594 ///
2595 /// * `event_type` - The type of the event.
2596 ///
2597 /// * `content` - The content of the event as a raw JSON value. The argument
2598 /// type can be `serde_json::Value`, but also other raw JSON types; for
2599 /// the full list check the documentation of
2600 /// [`IntoRawMessageLikeEventContent`].
2601 ///
2602 /// # Examples
2603 ///
2604 /// ```no_run
2605 /// # use std::sync::{Arc, RwLock};
2606 /// # use matrix_sdk::{Client, config::SyncSettings};
2607 /// # use url::Url;
2608 /// # use matrix_sdk::ruma::room_id;
2609 /// # async {
2610 /// # let homeserver = Url::parse("http://localhost:8080")?;
2611 /// # let mut client = Client::new(homeserver).await?;
2612 /// # let room_id = room_id!("!test:localhost");
2613 /// use serde_json::json;
2614 ///
2615 /// if let Some(room) = client.get_room(&room_id) {
2616 /// room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2617 /// }
2618 /// # anyhow::Ok(()) };
2619 /// ```
2620 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2621 pub fn send_raw<'a>(
2622 &'a self,
2623 event_type: &'a str,
2624 content: impl IntoRawMessageLikeEventContent,
2625 ) -> SendRawMessageLikeEvent<'a> {
2626 // Note: the recorded instrument fields are saved in
2627 // `SendRawMessageLikeEvent::into_future`.
2628 SendRawMessageLikeEvent::new(self, event_type, content)
2629 }
2630
2631 /// Send an attachment to this room.
2632 ///
2633 /// This will upload the given data that the reader produces using the
2634 /// [`upload()`] method and post an event to the given room.
2635 /// If the room is encrypted and the encryption feature is enabled the
2636 /// upload will be encrypted.
2637 ///
2638 /// This is a convenience method that calls the
2639 /// [`upload()`] and afterwards the [`send()`].
2640 ///
2641 /// # Arguments
2642 /// * `filename` - The file name.
2643 ///
2644 /// * `content_type` - The type of the media, this will be used as the
2645 /// content-type header.
2646 ///
2647 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2648 /// media.
2649 ///
2650 /// * `config` - Metadata and configuration for the attachment.
2651 ///
2652 /// # Examples
2653 ///
2654 /// ```no_run
2655 /// # use std::fs;
2656 /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2657 /// # use url::Url;
2658 /// # use mime;
2659 /// # async {
2660 /// # let homeserver = Url::parse("http://localhost:8080")?;
2661 /// # let mut client = Client::new(homeserver).await?;
2662 /// # let room_id = room_id!("!test:localhost");
2663 /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2664 ///
2665 /// if let Some(room) = client.get_room(&room_id) {
2666 /// room.send_attachment(
2667 /// "my_favorite_cat.jpg",
2668 /// &mime::IMAGE_JPEG,
2669 /// image,
2670 /// AttachmentConfig::new(),
2671 /// ).await?;
2672 /// }
2673 /// # anyhow::Ok(()) };
2674 /// ```
2675 ///
2676 /// [`upload()`]: crate::Media::upload
2677 /// [`send()`]: Self::send
2678 #[instrument(skip_all)]
2679 pub fn send_attachment<'a>(
2680 &'a self,
2681 filename: impl Into<String>,
2682 content_type: &'a Mime,
2683 data: Vec<u8>,
2684 config: AttachmentConfig,
2685 ) -> SendAttachment<'a> {
2686 SendAttachment::new(self, filename.into(), content_type, data, config)
2687 }
2688
2689 /// Prepare and send an attachment to this room.
2690 ///
2691 /// This will upload the given data that the reader produces using the
2692 /// [`upload()`](#method.upload) method and post an event to the given room.
2693 /// If the room is encrypted and the encryption feature is enabled the
2694 /// upload will be encrypted.
2695 ///
2696 /// This is a convenience method that calls the
2697 /// [`Client::upload()`](#Client::method.upload) and afterwards the
2698 /// [`send()`](#method.send).
2699 ///
2700 /// # Arguments
2701 /// * `filename` - The file name.
2702 ///
2703 /// * `content_type` - The type of the media, this will be used as the
2704 /// content-type header.
2705 ///
2706 /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2707 /// media.
2708 ///
2709 /// * `config` - Metadata and configuration for the attachment.
2710 ///
2711 /// * `send_progress` - An observable to transmit forward progress about the
2712 /// upload.
2713 ///
2714 /// * `store_in_cache` - A boolean defining whether the uploaded media will
2715 /// be stored in the cache immediately after a successful upload.
2716 #[instrument(skip_all)]
2717 pub(super) async fn prepare_and_send_attachment<'a>(
2718 &'a self,
2719 filename: String,
2720 content_type: &'a Mime,
2721 data: Vec<u8>,
2722 mut config: AttachmentConfig,
2723 send_progress: SharedObservable<TransmissionProgress>,
2724 store_in_cache: bool,
2725 ) -> Result<send_message_event::v3::Response> {
2726 self.ensure_room_joined()?;
2727
2728 let txn_id = config.txn_id.take();
2729 let mentions = config.mentions.take();
2730
2731 let thumbnail = config.thumbnail.take();
2732
2733 // If necessary, store caching data for the thumbnail ahead of time.
2734 let thumbnail_cache_info = if store_in_cache {
2735 thumbnail
2736 .as_ref()
2737 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2738 } else {
2739 None
2740 };
2741
2742 #[cfg(feature = "e2e-encryption")]
2743 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2744 self.client
2745 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2746 .await?
2747 } else {
2748 self.client
2749 .media()
2750 .upload_plain_media_and_thumbnail(
2751 content_type,
2752 // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2753 // similar.
2754 data.clone(),
2755 thumbnail,
2756 send_progress,
2757 )
2758 .await?
2759 };
2760
2761 #[cfg(not(feature = "e2e-encryption"))]
2762 let (media_source, thumbnail) = self
2763 .client
2764 .media()
2765 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2766 .await?;
2767
2768 if store_in_cache {
2769 let media_store_lock_guard = self.client.media_store().lock().await?;
2770
2771 // A failure to cache shouldn't prevent the whole upload from finishing
2772 // properly, so only log errors during caching.
2773
2774 debug!("caching the media");
2775 let request =
2776 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2777
2778 if let Err(err) = media_store_lock_guard
2779 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2780 .await
2781 {
2782 warn!("unable to cache the media after uploading it: {err}");
2783 }
2784
2785 if let Some(((data, height, width), source)) =
2786 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2787 {
2788 debug!("caching the thumbnail");
2789
2790 let request = MediaRequestParameters {
2791 source: source.clone(),
2792 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2793 };
2794
2795 if let Err(err) = media_store_lock_guard
2796 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2797 .await
2798 {
2799 warn!("unable to cache the media after uploading it: {err}");
2800 }
2801 }
2802 }
2803
2804 let content = self
2805 .make_media_event(
2806 Room::make_attachment_type(
2807 content_type,
2808 filename,
2809 media_source,
2810 config.caption,
2811 config.info,
2812 thumbnail,
2813 ),
2814 mentions,
2815 config.reply,
2816 )
2817 .await?;
2818
2819 let mut fut = self.send(content);
2820 if let Some(txn_id) = txn_id {
2821 fut = fut.with_transaction_id(txn_id);
2822 }
2823
2824 fut.await.map(|result| result.response)
2825 }
2826
2827 /// Creates the inner [`MessageType`] for an already-uploaded media file
2828 /// provided by its source.
2829 #[allow(clippy::too_many_arguments)]
2830 pub(crate) fn make_attachment_type(
2831 content_type: &Mime,
2832 filename: String,
2833 source: MediaSource,
2834 caption: Option<TextMessageEventContent>,
2835 info: Option<AttachmentInfo>,
2836 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2837 ) -> MessageType {
2838 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2839 }
2840
2841 /// Creates the [`RoomMessageEventContent`] based on the message type,
2842 /// mentions and reply information.
2843 pub(crate) async fn make_media_event(
2844 &self,
2845 msg_type: MessageType,
2846 mentions: Option<Mentions>,
2847 reply: Option<Reply>,
2848 ) -> Result<RoomMessageEventContent> {
2849 let mut content = RoomMessageEventContent::new(msg_type);
2850 if let Some(mentions) = mentions {
2851 content = content.add_mentions(mentions);
2852 }
2853 if let Some(reply) = reply {
2854 // Since we just created the event, there is no relation attached to it. Thus,
2855 // it is safe to add the reply relation without overriding anything.
2856 content = self.make_reply_event(content.into(), reply).await?;
2857 }
2858 Ok(content)
2859 }
2860
2861 /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2862 /// provided by its source.
2863 #[cfg(feature = "unstable-msc4274")]
2864 #[allow(clippy::too_many_arguments)]
2865 pub(crate) fn make_gallery_item_type(
2866 content_type: &Mime,
2867 filename: String,
2868 source: MediaSource,
2869 caption: Option<TextMessageEventContent>,
2870 info: Option<AttachmentInfo>,
2871 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2872 ) -> GalleryItemType {
2873 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2874 }
2875
2876 /// Update the power levels of a select set of users of this room.
2877 ///
2878 /// Issue a `power_levels` state event request to the server, changing the
2879 /// given UserId -> Int levels. May fail if the `power_levels` aren't
2880 /// locally known yet or the server rejects the state event update, e.g.
2881 /// because of insufficient permissions. Neither permissions to update
2882 /// nor whether the data might be stale is checked prior to issuing the
2883 /// request.
2884 pub async fn update_power_levels(
2885 &self,
2886 updates: Vec<(&UserId, Int)>,
2887 ) -> Result<send_state_event::v3::Response> {
2888 let mut power_levels = self.power_levels().await?;
2889
2890 for (user_id, new_level) in updates {
2891 if new_level == power_levels.users_default {
2892 power_levels.users.remove(user_id);
2893 } else {
2894 power_levels.users.insert(user_id.to_owned(), new_level);
2895 }
2896 }
2897
2898 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2899 }
2900
2901 /// Applies a set of power level changes to this room.
2902 ///
2903 /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2904 /// remain unchanged.
2905 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2906 let mut power_levels = self.power_levels().await?;
2907 power_levels.apply(changes)?;
2908 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2909 Ok(())
2910 }
2911
2912 /// Resets the room's power levels to the default values
2913 ///
2914 /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2915 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2916 let creators = self.creators().unwrap_or_default();
2917 let rules = self.clone_info().room_version_rules_or_default();
2918
2919 let default_power_levels =
2920 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2921 let changes = RoomPowerLevelChanges::from(default_power_levels);
2922 self.apply_power_level_changes(changes).await?;
2923 Ok(self.power_levels().await?)
2924 }
2925
2926 /// Gets the suggested role for the user with the provided `user_id`.
2927 ///
2928 /// This method checks the `RoomPowerLevels` events instead of loading the
2929 /// member list and looking for the member.
2930 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2931 let power_level = self.get_user_power_level(user_id).await?;
2932 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2933 }
2934
2935 /// Gets the power level the user with the provided `user_id`.
2936 ///
2937 /// This method checks the `RoomPowerLevels` events instead of loading the
2938 /// member list and looking for the member.
2939 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2940 let event = self.power_levels().await?;
2941 Ok(event.for_user(user_id))
2942 }
2943
2944 /// Gets a map with the `UserId` of users with power levels other than `0`
2945 /// and this power level.
2946 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2947 let power_levels = self.power_levels().await.ok();
2948 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2949 if let Some(power_levels) = power_levels {
2950 for (id, level) in power_levels.users.into_iter() {
2951 user_power_levels.insert(id, level.into());
2952 }
2953 }
2954 user_power_levels
2955 }
2956
2957 /// Sets the name of this room.
2958 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2959 self.send_state_event(RoomNameEventContent::new(name)).await
2960 }
2961
2962 /// Sets a new topic for this room.
2963 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2964 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2965 }
2966
2967 /// Sets the new avatar url for this room.
2968 ///
2969 /// # Arguments
2970 /// * `avatar_url` - The owned Matrix uri that represents the avatar
2971 /// * `info` - The optional image info that can be provided for the avatar
2972 pub async fn set_avatar_url(
2973 &self,
2974 url: &MxcUri,
2975 info: Option<avatar::ImageInfo>,
2976 ) -> Result<send_state_event::v3::Response> {
2977 self.ensure_room_joined()?;
2978
2979 let mut room_avatar_event = RoomAvatarEventContent::new();
2980 room_avatar_event.url = Some(url.to_owned());
2981 room_avatar_event.info = info.map(Box::new);
2982
2983 self.send_state_event(room_avatar_event).await
2984 }
2985
2986 /// Removes the avatar from the room
2987 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2988 self.send_state_event(RoomAvatarEventContent::new()).await
2989 }
2990
2991 /// Uploads a new avatar for this room.
2992 ///
2993 /// # Arguments
2994 /// * `mime` - The mime type describing the data
2995 /// * `data` - The data representation of the avatar
2996 /// * `info` - The optional image info provided for the avatar, the blurhash
2997 /// and the mimetype will always be updated
2998 pub async fn upload_avatar(
2999 &self,
3000 mime: &Mime,
3001 data: Vec<u8>,
3002 info: Option<avatar::ImageInfo>,
3003 ) -> Result<send_state_event::v3::Response> {
3004 self.ensure_room_joined()?;
3005
3006 let upload_response = self.client.media().upload(mime, data, None).await?;
3007 let mut info = info.unwrap_or_default();
3008 info.blurhash = upload_response.blurhash;
3009 info.mimetype = Some(mime.to_string());
3010
3011 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3012 }
3013
3014 /// Send a state event with an empty state key to the homeserver.
3015 ///
3016 /// For state events with a non-empty state key, see
3017 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3018 ///
3019 /// Returns the parsed response from the server.
3020 ///
3021 /// # Arguments
3022 ///
3023 /// * `content` - The content of the state event.
3024 ///
3025 /// # Examples
3026 ///
3027 /// ```no_run
3028 /// # use serde::{Deserialize, Serialize};
3029 /// # async {
3030 /// # let joined_room: matrix_sdk::Room = todo!();
3031 /// use matrix_sdk::ruma::{
3032 /// EventEncryptionAlgorithm,
3033 /// events::{
3034 /// EmptyStateKey, macros::EventContent,
3035 /// room::encryption::RoomEncryptionEventContent,
3036 /// },
3037 /// };
3038 ///
3039 /// let encryption_event_content = RoomEncryptionEventContent::new(
3040 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3041 /// );
3042 /// joined_room.send_state_event(encryption_event_content).await?;
3043 ///
3044 /// // Custom event:
3045 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3046 /// #[ruma_event(
3047 /// type = "org.matrix.msc_9000.xxx",
3048 /// kind = State,
3049 /// state_key_type = EmptyStateKey,
3050 /// )]
3051 /// struct XxxStateEventContent {/* fields... */}
3052 ///
3053 /// let content: XxxStateEventContent = todo!();
3054 /// joined_room.send_state_event(content).await?;
3055 /// # anyhow::Ok(()) };
3056 /// ```
3057 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3058 #[instrument(skip_all)]
3059 pub async fn send_state_event(
3060 &self,
3061 content: impl StateEventContent<StateKey = EmptyStateKey>,
3062 ) -> Result<send_state_event::v3::Response> {
3063 self.send_state_event_for_key(&EmptyStateKey, content).await
3064 }
3065
3066 /// Send a state event with an empty state key to the homeserver.
3067 ///
3068 /// For state events with a non-empty state key, see
3069 /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3070 ///
3071 /// If the experimental state event encryption feature is enabled, this
3072 /// method will transparently encrypt the event if this room is
3073 /// encrypted (except if the event type is considered critical for the room
3074 /// to function, as outlined in [MSC4362][msc4362]).
3075 ///
3076 /// Returns the parsed response from the server.
3077 ///
3078 /// # Arguments
3079 ///
3080 /// * `content` - The content of the state event.
3081 ///
3082 /// # Examples
3083 ///
3084 /// ```no_run
3085 /// # use serde::{Deserialize, Serialize};
3086 /// # async {
3087 /// # let joined_room: matrix_sdk::Room = todo!();
3088 /// use matrix_sdk::ruma::{
3089 /// EventEncryptionAlgorithm,
3090 /// events::{
3091 /// EmptyStateKey, macros::EventContent,
3092 /// room::encryption::RoomEncryptionEventContent,
3093 /// },
3094 /// };
3095 ///
3096 /// let encryption_event_content = RoomEncryptionEventContent::new(
3097 /// EventEncryptionAlgorithm::MegolmV1AesSha2,
3098 /// );
3099 /// joined_room.send_state_event(encryption_event_content).await?;
3100 ///
3101 /// // Custom event:
3102 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3103 /// #[ruma_event(
3104 /// type = "org.matrix.msc_9000.xxx",
3105 /// kind = State,
3106 /// state_key_type = EmptyStateKey,
3107 /// )]
3108 /// struct XxxStateEventContent {/* fields... */}
3109 ///
3110 /// let content: XxxStateEventContent = todo!();
3111 /// joined_room.send_state_event(content).await?;
3112 /// # anyhow::Ok(()) };
3113 /// ```
3114 ///
3115 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3116 #[cfg(feature = "experimental-encrypted-state-events")]
3117 #[instrument(skip_all)]
3118 pub fn send_state_event<'a>(
3119 &'a self,
3120 content: impl StateEventContent<StateKey = EmptyStateKey>,
3121 ) -> SendStateEvent<'a> {
3122 self.send_state_event_for_key(&EmptyStateKey, content)
3123 }
3124
3125 /// Send a state event to the homeserver.
3126 ///
3127 /// Returns the parsed response from the server.
3128 ///
3129 /// # Arguments
3130 ///
3131 /// * `content` - The content of the state event.
3132 ///
3133 /// * `state_key` - A unique key which defines the overwriting semantics for
3134 /// this piece of room state.
3135 ///
3136 /// # Examples
3137 ///
3138 /// ```no_run
3139 /// # use serde::{Deserialize, Serialize};
3140 /// # async {
3141 /// # let joined_room: matrix_sdk::Room = todo!();
3142 /// use matrix_sdk::ruma::{
3143 /// events::{
3144 /// macros::EventContent,
3145 /// room::member::{RoomMemberEventContent, MembershipState},
3146 /// },
3147 /// mxc_uri,
3148 /// };
3149 ///
3150 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3151 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3152 /// content.avatar_url = Some(avatar_url);
3153 ///
3154 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3155 ///
3156 /// // Custom event:
3157 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3158 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3159 /// struct XxxStateEventContent { /* fields... */ }
3160 ///
3161 /// let content: XxxStateEventContent = todo!();
3162 /// joined_room.send_state_event_for_key("foo", content).await?;
3163 /// # anyhow::Ok(()) };
3164 /// ```
3165 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3166 pub async fn send_state_event_for_key<C, K>(
3167 &self,
3168 state_key: &K,
3169 content: C,
3170 ) -> Result<send_state_event::v3::Response>
3171 where
3172 C: StateEventContent,
3173 C::StateKey: Borrow<K>,
3174 K: AsRef<str> + ?Sized,
3175 {
3176 self.ensure_room_joined()?;
3177 let request =
3178 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3179 let response = self.client.send(request).await?;
3180 Ok(response)
3181 }
3182
3183 /// Send a state event to the homeserver. If state encryption is enabled in
3184 /// this room, the event will be encrypted.
3185 ///
3186 /// If the experimental state event encryption feature is enabled, this
3187 /// method will transparently encrypt the event if this room is
3188 /// encrypted (except if the event type is considered critical for the room
3189 /// to function, as outlined in [MSC4362][msc4362]).
3190 ///
3191 /// Returns the parsed response from the server.
3192 ///
3193 /// # Arguments
3194 ///
3195 /// * `content` - The content of the state event.
3196 ///
3197 /// * `state_key` - A unique key which defines the overwriting semantics for
3198 /// this piece of room state.
3199 ///
3200 /// # Examples
3201 ///
3202 /// ```no_run
3203 /// # use serde::{Deserialize, Serialize};
3204 /// # async {
3205 /// # let joined_room: matrix_sdk::Room = todo!();
3206 /// use matrix_sdk::ruma::{
3207 /// events::{
3208 /// macros::EventContent,
3209 /// room::member::{RoomMemberEventContent, MembershipState},
3210 /// },
3211 /// mxc_uri,
3212 /// };
3213 ///
3214 /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3215 /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3216 /// content.avatar_url = Some(avatar_url);
3217 ///
3218 /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3219 ///
3220 /// // Custom event:
3221 /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3222 /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3223 /// struct XxxStateEventContent { /* fields... */ }
3224 ///
3225 /// let content: XxxStateEventContent = todo!();
3226 /// joined_room.send_state_event_for_key("foo", content).await?;
3227 /// # anyhow::Ok(()) };
3228 /// ```
3229 ///
3230 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3231 #[cfg(feature = "experimental-encrypted-state-events")]
3232 pub fn send_state_event_for_key<'a, C, K>(
3233 &'a self,
3234 state_key: &K,
3235 content: C,
3236 ) -> SendStateEvent<'a>
3237 where
3238 C: StateEventContent,
3239 C::StateKey: Borrow<K>,
3240 K: AsRef<str> + ?Sized,
3241 {
3242 SendStateEvent::new(self, state_key, content)
3243 }
3244
3245 /// Send a raw room state event to the homeserver.
3246 ///
3247 /// Returns the parsed response from the server.
3248 ///
3249 /// # Arguments
3250 ///
3251 /// * `event_type` - The type of the event that we're sending out.
3252 ///
3253 /// * `state_key` - A unique key which defines the overwriting semantics for
3254 /// this piece of room state. This value is often a zero-length string.
3255 ///
3256 /// * `content` - The content of the event as a raw JSON value. The argument
3257 /// type can be `serde_json::Value`, but also other raw JSON types; for
3258 /// the full list check the documentation of [`IntoRawStateEventContent`].
3259 ///
3260 /// # Examples
3261 ///
3262 /// ```no_run
3263 /// use serde_json::json;
3264 ///
3265 /// # async {
3266 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3267 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3268 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3269 ///
3270 /// if let Some(room) = client.get_room(&room_id) {
3271 /// room.send_state_event_raw("m.room.member", "", json!({
3272 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3273 /// "displayname": "Alice Margatroid",
3274 /// "membership": "join",
3275 /// })).await?;
3276 /// }
3277 /// # anyhow::Ok(()) };
3278 /// ```
3279 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3280 #[instrument(skip_all)]
3281 pub async fn send_state_event_raw(
3282 &self,
3283 event_type: &str,
3284 state_key: &str,
3285 content: impl IntoRawStateEventContent,
3286 ) -> Result<send_state_event::v3::Response> {
3287 self.ensure_room_joined()?;
3288
3289 let request = send_state_event::v3::Request::new_raw(
3290 self.room_id().to_owned(),
3291 event_type.into(),
3292 state_key.to_owned(),
3293 content.into_raw_state_event_content(),
3294 );
3295
3296 Ok(self.client.send(request).await?)
3297 }
3298
3299 /// Send a raw room state event to the homeserver.
3300 ///
3301 /// If the experimental state event encryption feature is enabled, this
3302 /// method will transparently encrypt the event if this room is
3303 /// encrypted (except if the event type is considered critical for the room
3304 /// to function, as outlined in [MSC4362][msc4362]).
3305 ///
3306 /// Returns the parsed response from the server.
3307 ///
3308 /// # Arguments
3309 ///
3310 /// * `event_type` - The type of the event that we're sending out.
3311 ///
3312 /// * `state_key` - A unique key which defines the overwriting semantics for
3313 /// this piece of room state. This value is often a zero-length string.
3314 ///
3315 /// * `content` - The content of the event as a raw JSON value. The argument
3316 /// type can be `serde_json::Value`, but also other raw JSON types; for
3317 /// the full list check the documentation of [`IntoRawStateEventContent`].
3318 ///
3319 /// # Examples
3320 ///
3321 /// ```no_run
3322 /// use serde_json::json;
3323 ///
3324 /// # async {
3325 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3326 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3327 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3328 ///
3329 /// if let Some(room) = client.get_room(&room_id) {
3330 /// room.send_state_event_raw("m.room.member", "", json!({
3331 /// "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3332 /// "displayname": "Alice Margatroid",
3333 /// "membership": "join",
3334 /// })).await?;
3335 /// }
3336 /// # anyhow::Ok(()) };
3337 /// ```
3338 ///
3339 /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3340 #[cfg(feature = "experimental-encrypted-state-events")]
3341 #[instrument(skip_all)]
3342 pub fn send_state_event_raw<'a>(
3343 &'a self,
3344 event_type: &'a str,
3345 state_key: &'a str,
3346 content: impl IntoRawStateEventContent,
3347 ) -> SendRawStateEvent<'a> {
3348 SendRawStateEvent::new(self, event_type, state_key, content)
3349 }
3350
3351 /// Strips all information out of an event of the room.
3352 ///
3353 /// Returns the [`redact_event::v3::Response`] from the server.
3354 ///
3355 /// This cannot be undone. Users may redact their own events, and any user
3356 /// with a power level greater than or equal to the redact power level of
3357 /// the room may redact events there.
3358 ///
3359 /// # Arguments
3360 ///
3361 /// * `event_id` - The ID of the event to redact
3362 ///
3363 /// * `reason` - The reason for the event being redacted.
3364 ///
3365 /// * `txn_id` - A unique ID that can be attached to this event as
3366 /// its transaction ID. If not given one is created for the message.
3367 ///
3368 /// # Examples
3369 ///
3370 /// ```no_run
3371 /// use matrix_sdk::ruma::event_id;
3372 ///
3373 /// # async {
3374 /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3375 /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3376 /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3377 /// #
3378 /// if let Some(room) = client.get_room(&room_id) {
3379 /// let event_id = event_id!("$xxxxxx:example.org");
3380 /// let reason = Some("Indecent material");
3381 /// room.redact(&event_id, reason, None).await?;
3382 /// }
3383 /// # anyhow::Ok(()) };
3384 /// ```
3385 #[instrument(skip_all)]
3386 pub async fn redact(
3387 &self,
3388 event_id: &EventId,
3389 reason: Option<&str>,
3390 txn_id: Option<OwnedTransactionId>,
3391 ) -> HttpResult<redact_event::v3::Response> {
3392 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3393 let request = assign!(
3394 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3395 { reason: reason.map(ToOwned::to_owned) }
3396 );
3397
3398 self.client.send(request).await
3399 }
3400
3401 /// Get a list of servers that should know this room.
3402 ///
3403 /// Uses the synced members of the room and the suggested [routing
3404 /// algorithm] from the Matrix spec.
3405 ///
3406 /// Returns at most three servers.
3407 ///
3408 /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3409 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3410 let acl_ev = self
3411 .get_state_event_static::<RoomServerAclEventContent>()
3412 .await?
3413 .and_then(|ev| ev.deserialize().ok());
3414 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3415 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3416 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3417 });
3418
3419 // Filter out server names that:
3420 // - Are blocked due to server ACLs
3421 // - Are IP addresses
3422 let members: Vec<_> = self
3423 .members_no_sync(RoomMemberships::JOIN)
3424 .await?
3425 .into_iter()
3426 .filter(|member| {
3427 let server = member.user_id().server_name();
3428 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3429 })
3430 .collect();
3431
3432 // Get the server of the highest power level user in the room, provided
3433 // they are at least power level 50.
3434 let max = members
3435 .iter()
3436 .max_by_key(|member| member.power_level())
3437 .filter(|max| max.power_level() >= int!(50))
3438 .map(|member| member.user_id().server_name());
3439
3440 // Sort the servers by population.
3441 let servers = members
3442 .iter()
3443 .map(|member| member.user_id().server_name())
3444 .filter(|server| max.filter(|max| max == server).is_none())
3445 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3446 *servers.entry(server).or_default() += 1;
3447 servers
3448 });
3449 let mut servers: Vec<_> = servers.into_iter().collect();
3450 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3451
3452 Ok(max
3453 .into_iter()
3454 .chain(servers.into_iter().map(|(name, _)| name))
3455 .take(3)
3456 .map(ToOwned::to_owned)
3457 .collect())
3458 }
3459
3460 /// Get a `matrix.to` permalink to this room.
3461 ///
3462 /// If this room has an alias, we use it. Otherwise, we try to use the
3463 /// synced members in the room for [routing] the room ID.
3464 ///
3465 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3466 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3467 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3468 return Ok(alias.matrix_to_uri());
3469 }
3470
3471 let via = self.route().await?;
3472 Ok(self.room_id().matrix_to_uri_via(via))
3473 }
3474
3475 /// Get a `matrix:` permalink to this room.
3476 ///
3477 /// If this room has an alias, we use it. Otherwise, we try to use the
3478 /// synced members in the room for [routing] the room ID.
3479 ///
3480 /// # Arguments
3481 ///
3482 /// * `join` - Whether the user should join the room.
3483 ///
3484 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3485 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3486 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3487 return Ok(alias.matrix_uri(join));
3488 }
3489
3490 let via = self.route().await?;
3491 Ok(self.room_id().matrix_uri_via(via, join))
3492 }
3493
3494 /// Get a `matrix.to` permalink to an event in this room.
3495 ///
3496 /// We try to use the synced members in the room for [routing] the room ID.
3497 ///
3498 /// *Note*: This method does not check if the given event ID is actually
3499 /// part of this room. It needs to be checked before calling this method
3500 /// otherwise the permalink won't work.
3501 ///
3502 /// # Arguments
3503 ///
3504 /// * `event_id` - The ID of the event.
3505 ///
3506 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3507 pub async fn matrix_to_event_permalink(
3508 &self,
3509 event_id: impl Into<OwnedEventId>,
3510 ) -> Result<MatrixToUri> {
3511 // Don't use the alias because an event is tied to a room ID, but an
3512 // alias might point to another room, e.g. after a room upgrade.
3513 let via = self.route().await?;
3514 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3515 }
3516
3517 /// Get a `matrix:` permalink to an event in this room.
3518 ///
3519 /// We try to use the synced members in the room for [routing] the room ID.
3520 ///
3521 /// *Note*: This method does not check if the given event ID is actually
3522 /// part of this room. It needs to be checked before calling this method
3523 /// otherwise the permalink won't work.
3524 ///
3525 /// # Arguments
3526 ///
3527 /// * `event_id` - The ID of the event.
3528 ///
3529 /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3530 pub async fn matrix_event_permalink(
3531 &self,
3532 event_id: impl Into<OwnedEventId>,
3533 ) -> Result<MatrixUri> {
3534 // Don't use the alias because an event is tied to a room ID, but an
3535 // alias might point to another room, e.g. after a room upgrade.
3536 let via = self.route().await?;
3537 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3538 }
3539
3540 /// Get the latest receipt of a user in this room.
3541 ///
3542 /// # Arguments
3543 ///
3544 /// * `receipt_type` - The type of receipt to get.
3545 ///
3546 /// * `thread` - The thread containing the event of the receipt, if any.
3547 ///
3548 /// * `user_id` - The ID of the user.
3549 ///
3550 /// Returns the ID of the event on which the receipt applies and the
3551 /// receipt.
3552 pub async fn load_user_receipt(
3553 &self,
3554 receipt_type: ReceiptType,
3555 thread: ReceiptThread,
3556 user_id: &UserId,
3557 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3558 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3559 }
3560
3561 /// Load the receipts for an event in this room from storage.
3562 ///
3563 /// # Arguments
3564 ///
3565 /// * `receipt_type` - The type of receipt to get.
3566 ///
3567 /// * `thread` - The thread containing the event of the receipt, if any.
3568 ///
3569 /// * `event_id` - The ID of the event.
3570 ///
3571 /// Returns a list of IDs of users who have sent a receipt for the event and
3572 /// the corresponding receipts.
3573 pub async fn load_event_receipts(
3574 &self,
3575 receipt_type: ReceiptType,
3576 thread: ReceiptThread,
3577 event_id: &EventId,
3578 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3579 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3580 }
3581
3582 /// Get the push-condition context for this room.
3583 ///
3584 /// Returns `None` if some data couldn't be found. This should only happen
3585 /// in brand new rooms, while we process its state.
3586 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3587 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3588 .await
3589 }
3590
3591 /// Get the push-condition context for this room, with a choice to include
3592 /// thread subscriptions or not, based on the extra
3593 /// `with_threads_subscriptions` parameter.
3594 ///
3595 /// Returns `None` if some data couldn't be found. This should only happen
3596 /// in brand new rooms, while we process its state.
3597 pub(crate) async fn push_condition_room_ctx_internal(
3598 &self,
3599 with_threads_subscriptions: bool,
3600 ) -> Result<Option<PushConditionRoomCtx>> {
3601 let room_id = self.room_id();
3602 let user_id = self.own_user_id();
3603 let room_info = self.clone_info();
3604 let member_count = room_info.active_members_count();
3605
3606 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3607 member.name().to_owned()
3608 } else {
3609 return Ok(None);
3610 };
3611
3612 let power_levels = match self.power_levels().await {
3613 Ok(power_levels) => Some(power_levels.into()),
3614 Err(error) => {
3615 if matches!(room_info.state(), RoomState::Joined) {
3616 // It's normal to not have the power levels in a non-joined room, so don't log
3617 // the error if the room is not joined
3618 error!("Could not compute power levels for push conditions: {error}");
3619 }
3620 None
3621 }
3622 };
3623
3624 let mut ctx = assign!(PushConditionRoomCtx::new(
3625 room_id.to_owned(),
3626 UInt::new(member_count).unwrap_or(UInt::MAX),
3627 user_id.to_owned(),
3628 user_display_name,
3629 ),
3630 {
3631 power_levels,
3632 });
3633
3634 if with_threads_subscriptions {
3635 let this = self.clone();
3636 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3637 let room = this.clone();
3638 Box::pin(async move {
3639 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3640 maybe_sub.is_some()
3641 } else {
3642 false
3643 }
3644 })
3645 });
3646 }
3647
3648 Ok(Some(ctx))
3649 }
3650
3651 /// Retrieves a [`PushContext`] that can be used to compute the push
3652 /// actions for events.
3653 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3654 self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3655 }
3656
3657 /// Retrieves a [`PushContext`] that can be used to compute the push actions
3658 /// for events, with a choice to include thread subscriptions or not,
3659 /// based on the extra `with_threads_subscriptions` parameter.
3660 #[instrument(skip(self))]
3661 pub(crate) async fn push_context_internal(
3662 &self,
3663 with_threads_subscriptions: bool,
3664 ) -> Result<Option<PushContext>> {
3665 let Some(push_condition_room_ctx) =
3666 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3667 else {
3668 debug!("Could not aggregate push context");
3669 return Ok(None);
3670 };
3671 let push_rules = self.client().account().push_rules().await?;
3672 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3673 }
3674
3675 /// Get the push actions for the given event with the current room state.
3676 ///
3677 /// Note that it is possible that no push action is returned because the
3678 /// current room state does not have all the required state events.
3679 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3680 if let Some(ctx) = self.push_context().await? {
3681 Ok(Some(ctx.for_event(event).await))
3682 } else {
3683 Ok(None)
3684 }
3685 }
3686
3687 /// The membership details of the (latest) invite for the logged-in user in
3688 /// this room.
3689 pub async fn invite_details(&self) -> Result<Invite> {
3690 let state = self.state();
3691
3692 if state != RoomState::Invited {
3693 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3694 }
3695
3696 let invitee = self
3697 .get_member_no_sync(self.own_user_id())
3698 .await?
3699 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3700 let event = invitee.event();
3701
3702 let inviter_id = event.sender().to_owned();
3703 let inviter = self.get_member_no_sync(&inviter_id).await?;
3704
3705 Ok(Invite { invitee, inviter_id, inviter })
3706 }
3707
3708 /// Get the membership details for the current user.
3709 ///
3710 /// Returns:
3711 /// - If the user was present in the room, a
3712 /// [`RoomMemberWithSenderInfo`] containing both the user info and the
3713 /// member info of the sender of the `m.room.member` event.
3714 /// - If the current user is not present, an error.
3715 pub async fn member_with_sender_info(
3716 &self,
3717 user_id: &UserId,
3718 ) -> Result<RoomMemberWithSenderInfo> {
3719 let Some(member) = self.get_member_no_sync(user_id).await? else {
3720 return Err(Error::InsufficientData);
3721 };
3722
3723 let sender_member =
3724 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3725 // If the sender room member info is already available, return it
3726 Some(member)
3727 } else if self.are_members_synced() {
3728 // The room members are synced and we couldn't find the sender info
3729 None
3730 } else if self.sync_members().await.is_ok() {
3731 // Try getting the sender room member info again after syncing
3732 self.get_member_no_sync(member.event().sender()).await?
3733 } else {
3734 None
3735 };
3736
3737 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3738 }
3739
3740 /// Forget this room.
3741 ///
3742 /// This communicates to the homeserver that it should forget the room.
3743 ///
3744 /// Only left or banned-from rooms can be forgotten.
3745 pub async fn forget(&self) -> Result<()> {
3746 let state = self.state();
3747 match state {
3748 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3749 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3750 "Left / Banned",
3751 state,
3752 ))));
3753 }
3754 RoomState::Left | RoomState::Banned => {}
3755 }
3756
3757 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3758 let _response = self.client.send(request).await?;
3759
3760 // If it was a DM, remove the room from the `m.direct` global account data.
3761 if self.inner.direct_targets_length() != 0
3762 && let Err(e) = self.set_is_direct(false).await
3763 {
3764 // It is not important whether we managed to remove the room, it will not have
3765 // any consequences, so just log the error.
3766 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3767 }
3768
3769 self.client.base_client().forget_room(self.inner.room_id()).await?;
3770
3771 Ok(())
3772 }
3773
3774 fn ensure_room_joined(&self) -> Result<()> {
3775 let state = self.state();
3776 if state == RoomState::Joined {
3777 Ok(())
3778 } else {
3779 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3780 }
3781 }
3782
3783 /// Get the notification mode.
3784 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3785 if !matches!(self.state(), RoomState::Joined) {
3786 return None;
3787 }
3788
3789 let notification_settings = self.client().notification_settings().await;
3790
3791 // Get the user-defined mode if available
3792 let notification_mode =
3793 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3794
3795 if notification_mode.is_some() {
3796 notification_mode
3797 } else if let Ok(is_encrypted) =
3798 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3799 {
3800 // Otherwise, if encrypted status is available, get the default mode for this
3801 // type of room.
3802 // From the point of view of notification settings, a `one-to-one` room is one
3803 // that involves exactly two people.
3804 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3805 let default_mode = notification_settings
3806 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3807 .await;
3808 Some(default_mode)
3809 } else {
3810 None
3811 }
3812 }
3813
3814 /// Get the user-defined notification mode.
3815 ///
3816 /// The result is cached for fast and non-async call. To read the cached
3817 /// result, use
3818 /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3819 //
3820 // Note for maintainers:
3821 //
3822 // The fact the result is cached is an important property. If you change that in
3823 // the future, please review all calls to this method.
3824 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3825 if !matches!(self.state(), RoomState::Joined) {
3826 return None;
3827 }
3828
3829 let notification_settings = self.client().notification_settings().await;
3830
3831 // Get the user-defined mode if available.
3832 let mode =
3833 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3834
3835 if let Some(mode) = mode {
3836 self.update_cached_user_defined_notification_mode(mode);
3837 }
3838
3839 mode
3840 }
3841
3842 /// Report an event as inappropriate to the homeserver's administrator.
3843 ///
3844 /// # Arguments
3845 ///
3846 /// * `event_id` - The ID of the event to report.
3847 /// * `score` - The score to rate this content.
3848 /// * `reason` - The reason the content is being reported.
3849 ///
3850 /// # Errors
3851 ///
3852 /// Returns an error if the room is not joined or if an error occurs with
3853 /// the request.
3854 pub async fn report_content(
3855 &self,
3856 event_id: OwnedEventId,
3857 reason: Option<String>,
3858 ) -> Result<report_content::v3::Response> {
3859 let state = self.state();
3860 if state != RoomState::Joined {
3861 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3862 }
3863
3864 let request = assign!(
3865 report_content::v3::Request::new(
3866 self.inner.room_id().to_owned(),
3867 event_id,
3868 ), {
3869 reason: reason
3870 }
3871 );
3872 Ok(self.client.send(request).await?)
3873 }
3874
3875 /// Reports a room as inappropriate to the server.
3876 /// The caller is not required to be joined to the room to report it.
3877 ///
3878 /// # Arguments
3879 ///
3880 /// * `reason` - The reason the room is being reported.
3881 ///
3882 /// # Errors
3883 ///
3884 /// Returns an error if the room is not found or on rate limit
3885 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3886 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3887
3888 Ok(self.client.send(request).await?)
3889 }
3890
3891 /// Set a flag on the room to indicate that the user has explicitly marked
3892 /// it as (un)read.
3893 ///
3894 /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3895 /// value as `unread`.
3896 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3897 if self.is_marked_unread() == unread {
3898 // The request is not necessary.
3899 return Ok(());
3900 }
3901
3902 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3903
3904 let content = MarkedUnreadEventContent::new(unread);
3905
3906 let request = set_room_account_data::v3::Request::new(
3907 user_id.to_owned(),
3908 self.inner.room_id().to_owned(),
3909 &content,
3910 )?;
3911
3912 self.client.send(request).await?;
3913 Ok(())
3914 }
3915
3916 /// Returns the [`RoomEventCache`] associated to this room, assuming the
3917 /// global [`EventCache`] has been enabled for subscription.
3918 pub async fn event_cache(
3919 &self,
3920 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3921 self.client.event_cache().for_room(self.room_id()).await
3922 }
3923
3924 /// Get the beacon information event in the room for the `user_id`.
3925 ///
3926 /// # Errors
3927 ///
3928 /// Returns an error if the event is redacted, stripped, not found or could
3929 /// not be deserialized.
3930 pub(crate) async fn get_user_beacon_info(
3931 &self,
3932 user_id: &UserId,
3933 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3934 let raw_event = self
3935 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3936 .await?
3937 .ok_or(BeaconError::NotFound)?;
3938
3939 match raw_event.deserialize()? {
3940 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3941 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3942 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3943 }
3944 }
3945
3946 /// Start sharing live location in the room.
3947 ///
3948 /// # Arguments
3949 ///
3950 /// * `duration_millis` - The duration for which the live location is
3951 /// shared, in milliseconds.
3952 /// * `description` - An optional description for the live location share.
3953 ///
3954 /// # Errors
3955 ///
3956 /// Returns an error if the room is not joined or if the state event could
3957 /// not be sent.
3958 pub async fn start_live_location_share(
3959 &self,
3960 duration_millis: u64,
3961 description: Option<String>,
3962 ) -> Result<send_state_event::v3::Response> {
3963 self.ensure_room_joined()?;
3964
3965 self.send_state_event_for_key(
3966 self.own_user_id(),
3967 BeaconInfoEventContent::new(
3968 description,
3969 Duration::from_millis(duration_millis),
3970 true,
3971 None,
3972 ),
3973 )
3974 .await
3975 }
3976
3977 /// Stop sharing live location in the room.
3978 ///
3979 /// # Errors
3980 ///
3981 /// Returns an error if the room is not joined, if the beacon information
3982 /// is redacted or stripped, if the state event is not found, or if the
3983 /// existing beacon is no longer live.
3984 pub async fn stop_live_location_share(
3985 &self,
3986 ) -> Result<send_state_event::v3::Response, BeaconError> {
3987 self.ensure_room_joined()?;
3988
3989 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3990
3991 if beacon_info_event.content.live {
3992 beacon_info_event.content.stop();
3993 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3994 } else {
3995 Err(BeaconError::NotLive)
3996 }
3997 }
3998
3999 /// Send a location beacon event in the current room.
4000 ///
4001 /// # Arguments
4002 ///
4003 /// * `geo_uri` - The geo URI of the location beacon.
4004 ///
4005 /// # Errors
4006 ///
4007 /// Returns an error if the room is not joined, if the beacon information
4008 /// is redacted or stripped, if the location share is no longer live,
4009 /// or if the state event is not found.
4010 pub async fn send_location_beacon(
4011 &self,
4012 geo_uri: String,
4013 ) -> Result<send_message_event::v3::Response, BeaconError> {
4014 self.ensure_room_joined()?;
4015
4016 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4017
4018 if beacon_info_event.content.is_live() {
4019 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4020 Ok(self
4021 .send(content)
4022 .with_request_config(RequestConfig::new().retry_limit(6))
4023 .await?
4024 .response)
4025 } else {
4026 Err(BeaconError::NotLive)
4027 }
4028 }
4029
4030 /// Store the given `ComposerDraft` in the state store using the current
4031 /// room id and optional thread root id as identifier.
4032 pub async fn save_composer_draft(
4033 &self,
4034 draft: ComposerDraft,
4035 thread_root: Option<&EventId>,
4036 ) -> Result<()> {
4037 self.client
4038 .state_store()
4039 .set_kv_data(
4040 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4041 StateStoreDataValue::ComposerDraft(draft),
4042 )
4043 .await?;
4044 Ok(())
4045 }
4046
4047 /// Retrieve the `ComposerDraft` stored in the state store for this room
4048 /// and given thread, if any.
4049 pub async fn load_composer_draft(
4050 &self,
4051 thread_root: Option<&EventId>,
4052 ) -> Result<Option<ComposerDraft>> {
4053 let data = self
4054 .client
4055 .state_store()
4056 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4057 .await?;
4058 Ok(data.and_then(|d| d.into_composer_draft()))
4059 }
4060
4061 /// Remove the `ComposerDraft` stored in the state store for this room
4062 /// and given thread, if any.
4063 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4064 self.client
4065 .state_store()
4066 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4067 .await?;
4068 Ok(())
4069 }
4070
4071 /// Load pinned state events for a room from the `/state` endpoint in the
4072 /// home server.
4073 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4074 let response = self
4075 .client
4076 .send(get_state_event_for_key::v3::Request::new(
4077 self.room_id().to_owned(),
4078 StateEventType::RoomPinnedEvents,
4079 "".to_owned(),
4080 ))
4081 .await;
4082
4083 match response {
4084 Ok(response) => Ok(Some(
4085 response
4086 .into_content()
4087 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4088 .pinned,
4089 )),
4090 Err(http_error) => match http_error.as_client_api_error() {
4091 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4092 _ => Err(http_error.into()),
4093 },
4094 }
4095 }
4096
4097 /// Subscribe to knock requests in this `Room`.
4098 ///
4099 /// The current requests to join the room will be emitted immediately
4100 /// when subscribing.
4101 ///
4102 /// A new set of knock requests will be emitted whenever:
4103 /// - A new member event is received.
4104 /// - A knock request is marked as seen.
4105 /// - A sync is gappy (limited), so room membership information may be
4106 /// outdated.
4107 ///
4108 /// Returns both a stream of knock requests and a handle for a task that
4109 /// will clean up the seen knock request ids when possible.
4110 pub async fn subscribe_to_knock_requests(
4111 &self,
4112 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4113 let this = Arc::new(self.clone());
4114
4115 let room_member_events_observer =
4116 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4117
4118 let current_seen_ids = self.get_seen_knock_request_ids().await?;
4119 let mut seen_request_ids_stream = self
4120 .seen_knock_request_ids_map
4121 .subscribe()
4122 .await
4123 .map(|values| values.unwrap_or_default());
4124
4125 let mut room_info_stream = self.subscribe_info();
4126
4127 // Spawn a task that will clean up the seen knock request ids when updated room
4128 // members are received
4129 let clear_seen_ids_handle = spawn({
4130 let this = self.clone();
4131 async move {
4132 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4133 while member_updates_stream.recv().await.is_ok() {
4134 // If room members were updated, try to remove outdated seen knock request ids
4135 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4136 warn!("Failed to remove seen knock requests: {err}")
4137 }
4138 }
4139 }
4140 });
4141
4142 let combined_stream = stream! {
4143 // Emit current requests to join
4144 match this.get_current_join_requests(¤t_seen_ids).await {
4145 Ok(initial_requests) => yield initial_requests,
4146 Err(err) => warn!("Failed to get initial requests to join: {err}")
4147 }
4148
4149 let mut requests_stream = room_member_events_observer.subscribe();
4150 let mut seen_ids = current_seen_ids.clone();
4151
4152 loop {
4153 // This is equivalent to a combine stream operation, triggering a new emission
4154 // when any of the branches changes
4155 tokio::select! {
4156 Some((event, _)) = requests_stream.next() => {
4157 if let Some(event) = event.as_original() {
4158 // If we can calculate the membership change, try to emit only when needed
4159 let emit = if event.prev_content().is_some() {
4160 matches!(event.membership_change(),
4161 MembershipChange::Banned |
4162 MembershipChange::Knocked |
4163 MembershipChange::KnockAccepted |
4164 MembershipChange::KnockDenied |
4165 MembershipChange::KnockRetracted
4166 )
4167 } else {
4168 // If we can't calculate the membership change, assume we need to
4169 // emit updated values
4170 true
4171 };
4172
4173 if emit {
4174 match this.get_current_join_requests(&seen_ids).await {
4175 Ok(requests) => yield requests,
4176 Err(err) => {
4177 warn!("Failed to get updated knock requests on new member event: {err}")
4178 }
4179 }
4180 }
4181 }
4182 }
4183
4184 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4185 // Update the current seen ids
4186 seen_ids = new_seen_ids;
4187
4188 // If seen requests have changed we need to recalculate
4189 // all the knock requests
4190 match this.get_current_join_requests(&seen_ids).await {
4191 Ok(requests) => yield requests,
4192 Err(err) => {
4193 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4194 }
4195 }
4196 }
4197
4198 Some(room_info) = room_info_stream.next() => {
4199 // We need to emit new items when we may have missing room members:
4200 // this usually happens after a gappy (limited) sync
4201 if !room_info.are_members_synced() {
4202 match this.get_current_join_requests(&seen_ids).await {
4203 Ok(requests) => yield requests,
4204 Err(err) => {
4205 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4206 }
4207 }
4208 }
4209 }
4210 // If the streams in all branches are closed, stop the loop
4211 else => break,
4212 }
4213 }
4214 };
4215
4216 Ok((combined_stream, clear_seen_ids_handle))
4217 }
4218
4219 async fn get_current_join_requests(
4220 &self,
4221 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4222 ) -> Result<Vec<KnockRequest>> {
4223 Ok(self
4224 .members(RoomMemberships::KNOCK)
4225 .await?
4226 .into_iter()
4227 .filter_map(|member| {
4228 let event_id = member.event().event_id()?;
4229 Some(KnockRequest::new(
4230 self,
4231 event_id,
4232 member.event().timestamp(),
4233 KnockRequestMemberInfo::from_member(&member),
4234 seen_request_ids.contains_key(event_id),
4235 ))
4236 })
4237 .collect())
4238 }
4239
4240 /// Access the room settings related to privacy and visibility.
4241 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4242 RoomPrivacySettings::new(&self.inner, &self.client)
4243 }
4244
4245 /// Retrieve a list of all the threads for the current room.
4246 ///
4247 /// Since this client-server API is paginated, the return type may include a
4248 /// token used to resuming back-pagination into the list of results, in
4249 /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4250 /// [`ListThreadsOptions::from`] to continue the pagination
4251 /// from the previous position.
4252 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4253 let request = opts.into_request(self.room_id());
4254
4255 let response = self.client.send(request).await?;
4256
4257 let push_ctx = self.push_context().await?;
4258 let chunk = join_all(
4259 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4260 )
4261 .await;
4262
4263 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4264 }
4265
4266 /// Retrieve a list of relations for the given event, according to the given
4267 /// options, using the network.
4268 ///
4269 /// Since this client-server API is paginated, the return type may include a
4270 /// token used to resuming back-pagination into the list of results, in
4271 /// [`Relations::prev_batch_token`]. This token can be fed back into
4272 /// [`RelationsOptions::from`] to continue the pagination from the previous
4273 /// position.
4274 ///
4275 /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4276 /// then it must be used with the same
4277 /// [`RelationsOptions::include_relations`] value as the request that
4278 /// returns the `from` token, otherwise the server behavior is undefined.
4279 pub async fn relations(
4280 &self,
4281 event_id: OwnedEventId,
4282 opts: RelationsOptions,
4283 ) -> Result<Relations> {
4284 let relations = opts.send(self, event_id).await;
4285
4286 // Save any new related events to the cache.
4287 if let Ok(Relations { chunk, .. }) = &relations
4288 && let Ok((cache, _handles)) = self.event_cache().await
4289 {
4290 cache.save_events(chunk.clone()).await;
4291 }
4292
4293 relations
4294 }
4295
4296 /// Subscribe to a given thread in this room.
4297 ///
4298 /// This will subscribe the user to the thread, so that they will receive
4299 /// notifications for that thread specifically.
4300 ///
4301 /// # Arguments
4302 ///
4303 /// - `thread_root`: The ID of the thread root event to subscribe to.
4304 /// - `automatic`: Whether the subscription was made automatically by a
4305 /// client, not by manual user choice. If set, must include the latest
4306 /// event ID that's known in the thread and that is causing the automatic
4307 /// subscription. If unset (i.e. we're now subscribing manually) and there
4308 /// was a previous automatic subscription, the subscription will be
4309 /// overridden to a manual one instead.
4310 ///
4311 /// # Returns
4312 ///
4313 /// - A 404 error if the event isn't known, or isn't a thread root.
4314 /// - An `Ok` result if the subscription was successful, or if the server
4315 /// skipped an automatic subscription (as the user unsubscribed from the
4316 /// thread after the event causing the automatic subscription).
4317 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4318 pub async fn subscribe_thread(
4319 &self,
4320 thread_root: OwnedEventId,
4321 automatic: Option<OwnedEventId>,
4322 ) -> Result<()> {
4323 let is_automatic = automatic.is_some();
4324
4325 match self
4326 .client
4327 .send(subscribe_thread::unstable::Request::new(
4328 self.room_id().to_owned(),
4329 thread_root.clone(),
4330 automatic,
4331 ))
4332 .await
4333 {
4334 Ok(_response) => {
4335 trace!("Server acknowledged the thread subscription; saving in db");
4336
4337 // Immediately save the result into the database.
4338 self.client
4339 .state_store()
4340 .upsert_thread_subscriptions(vec![(
4341 self.room_id(),
4342 &thread_root,
4343 StoredThreadSubscription {
4344 status: ThreadSubscriptionStatus::Subscribed {
4345 automatic: is_automatic,
4346 },
4347 bump_stamp: None,
4348 },
4349 )])
4350 .await?;
4351
4352 Ok(())
4353 }
4354
4355 Err(err) => {
4356 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4357 // In this case: the server indicates that the user unsubscribed *after* the
4358 // event ID we've used in an automatic subscription; don't
4359 // save the subscription state in the database, as the
4360 // previous one should be more correct.
4361 trace!("Thread subscription skipped: {err}");
4362 Ok(())
4363 } else {
4364 // Forward the error to the caller.
4365 Err(err.into())
4366 }
4367 }
4368 }
4369 }
4370
4371 /// Subscribe to a thread if needed, based on a current subscription to it.
4372 ///
4373 /// This is like [`Self::subscribe_thread`], but it first checks if the user
4374 /// has already subscribed to a thread, so as to minimize sending
4375 /// unnecessary subscriptions which would be ignored by the server.
4376 pub async fn subscribe_thread_if_needed(
4377 &self,
4378 thread_root: &EventId,
4379 automatic: Option<OwnedEventId>,
4380 ) -> Result<()> {
4381 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4382 // If we have a previous subscription, we should only send the new one if it's
4383 // manual and the previous one was automatic.
4384 if !prev_sub.automatic || automatic.is_some() {
4385 // Either we had already a manual subscription, or we had an automatic one and
4386 // the new one is automatic too: nothing to do!
4387 return Ok(());
4388 }
4389 }
4390 self.subscribe_thread(thread_root.to_owned(), automatic).await
4391 }
4392
4393 /// Unsubscribe from a given thread in this room.
4394 ///
4395 /// # Arguments
4396 ///
4397 /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4398 ///
4399 /// # Returns
4400 ///
4401 /// - An `Ok` result if the unsubscription was successful, or the thread was
4402 /// already unsubscribed.
4403 /// - A 404 error if the event isn't known, or isn't a thread root.
4404 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4405 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4406 self.client
4407 .send(unsubscribe_thread::unstable::Request::new(
4408 self.room_id().to_owned(),
4409 thread_root.clone(),
4410 ))
4411 .await?;
4412
4413 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4414
4415 // Immediately save the result into the database.
4416 self.client
4417 .state_store()
4418 .upsert_thread_subscriptions(vec![(
4419 self.room_id(),
4420 &thread_root,
4421 StoredThreadSubscription {
4422 status: ThreadSubscriptionStatus::Unsubscribed,
4423 bump_stamp: None,
4424 },
4425 )])
4426 .await?;
4427
4428 Ok(())
4429 }
4430
4431 /// Return the current thread subscription for the given thread root in this
4432 /// room.
4433 ///
4434 /// # Arguments
4435 ///
4436 /// - `thread_root`: The ID of the thread root event to get the subscription
4437 /// for.
4438 ///
4439 /// # Returns
4440 ///
4441 /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4442 /// subscription information.
4443 /// - An `Ok` result with `None` if the subscription does not exist, or the
4444 /// event couldn't be found, or the event isn't a thread.
4445 /// - An error if the request fails for any other reason, such as a network
4446 /// error.
4447 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4448 pub async fn fetch_thread_subscription(
4449 &self,
4450 thread_root: OwnedEventId,
4451 ) -> Result<Option<ThreadSubscription>> {
4452 let result = self
4453 .client
4454 .send(get_thread_subscription::unstable::Request::new(
4455 self.room_id().to_owned(),
4456 thread_root.clone(),
4457 ))
4458 .await;
4459
4460 let subscription = match result {
4461 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4462 Err(http_error) => match http_error.as_client_api_error() {
4463 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4464 _ => return Err(http_error.into()),
4465 },
4466 };
4467
4468 // Keep the database in sync.
4469 if let Some(sub) = &subscription {
4470 self.client
4471 .state_store()
4472 .upsert_thread_subscriptions(vec![(
4473 self.room_id(),
4474 &thread_root,
4475 StoredThreadSubscription {
4476 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4477 bump_stamp: None,
4478 },
4479 )])
4480 .await?;
4481 } else {
4482 // If the subscription was not found, remove it from the database.
4483 self.client
4484 .state_store()
4485 .remove_thread_subscription(self.room_id(), &thread_root)
4486 .await?;
4487 }
4488
4489 Ok(subscription)
4490 }
4491
4492 /// Return the current thread subscription for the given thread root in this
4493 /// room, by getting it from storage if possible, or fetching it from
4494 /// network otherwise.
4495 ///
4496 /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4497 /// this method.
4498 pub async fn load_or_fetch_thread_subscription(
4499 &self,
4500 thread_root: &EventId,
4501 ) -> Result<Option<ThreadSubscription>> {
4502 // If the thread subscriptions list is outdated, fetch from the server.
4503 if self.client.thread_subscription_catchup().is_outdated() {
4504 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4505 }
4506
4507 // Otherwise, we can rely on the store information.
4508 Ok(self
4509 .client
4510 .state_store()
4511 .load_thread_subscription(self.room_id(), thread_root)
4512 .await
4513 .map(|maybe_sub| {
4514 maybe_sub.and_then(|stored| match stored.status {
4515 ThreadSubscriptionStatus::Unsubscribed => None,
4516 ThreadSubscriptionStatus::Subscribed { automatic } => {
4517 Some(ThreadSubscription { automatic })
4518 }
4519 })
4520 })?)
4521 }
4522
4523 /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4524 /// event containing the new event id.
4525 ///
4526 /// This method will first try to get the pinned events from the current
4527 /// room's state and if it fails to do so it'll try to load them from the
4528 /// homeserver.
4529 ///
4530 /// Returns `true` if we pinned the event, `false` if the event was already
4531 /// pinned.
4532 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4533 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4534 event_ids
4535 } else {
4536 self.load_pinned_events().await?.unwrap_or_default()
4537 };
4538 let event_id = event_id.to_owned();
4539 if pinned_event_ids.contains(&event_id) {
4540 Ok(false)
4541 } else {
4542 pinned_event_ids.push(event_id);
4543 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4544 self.send_state_event(content).await?;
4545 Ok(true)
4546 }
4547 }
4548
4549 /// Removes a pinned event by sending an updated `m.room.pinned_events`
4550 /// event without the event id we want to remove.
4551 ///
4552 /// This method will first try to get the pinned events from the current
4553 /// room's state and if it fails to do so it'll try to load them from the
4554 /// homeserver.
4555 ///
4556 /// Returns `true` if we unpinned the event, `false` if the event wasn't
4557 /// pinned before.
4558 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4559 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4560 event_ids
4561 } else {
4562 self.load_pinned_events().await?.unwrap_or_default()
4563 };
4564 let event_id = event_id.to_owned();
4565 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4566 pinned_event_ids.remove(idx);
4567 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4568 self.send_state_event(content).await?;
4569 Ok(true)
4570 } else {
4571 Ok(false)
4572 }
4573 }
4574
4575 /// Computes if the current room is a DM, stores the loaded values, and then
4576 /// returns the result.
4577 pub async fn compute_is_dm(&self) -> Result<bool> {
4578 Ok(self.inner.compute_is_dm(self.client.dm_room_definition()).await?)
4579 }
4580
4581 /// Checks if the current room is a DM in a synchronous way, without
4582 /// actually checking any local stores. Note this can be either a cached or
4583 /// an approximate value, since some important data may be unavailable
4584 /// and we may need to make some assumptions.
4585 pub fn is_dm(&self) -> bool {
4586 // Note: this value may be wrong for invited rooms.
4587 let is_direct = self.direct_targets_length() == 1;
4588 match self.client.dm_room_definition() {
4589 DmRoomDefinition::MatrixSpec => {
4590 // If there is a single target, it's a DM.
4591 is_direct
4592 }
4593 DmRoomDefinition::TwoMembers => {
4594 // If there is a single target and at most 2 active members, it's a DM.
4595 // Try getting the calculated active service members count from the room info.
4596 let active_service_member_count =
4597 self.active_service_members_count().unwrap_or_else(|| {
4598 // Otherwise just use an approximated value based on the service members
4599 // count.
4600 self.service_members().map(|members| members.len()).unwrap_or_default()
4601 as u64
4602 });
4603 let has_at_most_two_active_members =
4604 self.active_members_count().saturating_sub(active_service_member_count) <= 2;
4605 is_direct && has_at_most_two_active_members
4606 }
4607 }
4608 }
4609}
4610
4611#[cfg(feature = "e2e-encryption")]
4612impl RoomIdentityProvider for Room {
4613 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4614 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4615 }
4616
4617 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4618 Box::pin(async {
4619 let members = self
4620 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4621 .await
4622 .unwrap_or_else(|_| Default::default());
4623
4624 let mut ret: Vec<UserIdentity> = Vec::new();
4625 for member in members {
4626 if let Some(i) = self.user_identity(member.user_id()).await {
4627 ret.push(i);
4628 }
4629 }
4630 ret
4631 })
4632 }
4633
4634 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4635 Box::pin(async {
4636 self.client
4637 .encryption()
4638 .get_user_identity(user_id)
4639 .await
4640 .unwrap_or(None)
4641 .map(|u| u.underlying_identity())
4642 })
4643 }
4644}
4645
4646/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4647/// room, only when needed.
4648#[derive(Clone, Debug)]
4649pub(crate) struct WeakRoom {
4650 client: WeakClient,
4651 room_id: OwnedRoomId,
4652}
4653
4654impl WeakRoom {
4655 /// Create a new `WeakRoom` given its weak components.
4656 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4657 Self { client, room_id }
4658 }
4659
4660 /// Attempts to reconstruct the room.
4661 pub fn get(&self) -> Option<Room> {
4662 self.client.get().and_then(|client| client.get_room(&self.room_id))
4663 }
4664
4665 /// The room id for that room.
4666 pub fn room_id(&self) -> &RoomId {
4667 &self.room_id
4668 }
4669}
4670
4671/// Details of the (latest) invite.
4672#[derive(Debug, Clone)]
4673pub struct Invite {
4674 /// Who has been invited.
4675 pub invitee: RoomMember,
4676
4677 /// The user ID of who sent the invite.
4678 ///
4679 /// This is useful if `Self::inviter` is `None`.
4680 pub inviter_id: OwnedUserId,
4681
4682 /// Who sent the invite.
4683 ///
4684 /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4685 pub inviter: Option<RoomMember>,
4686}
4687
4688#[derive(Error, Debug)]
4689enum InvitationError {
4690 #[error("No membership event found")]
4691 EventMissing,
4692}
4693
4694/// Receipts to send all at once.
4695#[derive(Debug, Clone, Default)]
4696#[non_exhaustive]
4697pub struct Receipts {
4698 /// Fully-read marker (room account data).
4699 pub fully_read: Option<OwnedEventId>,
4700 /// Read receipt (public ephemeral room event).
4701 pub public_read_receipt: Option<OwnedEventId>,
4702 /// Read receipt (private ephemeral room event).
4703 pub private_read_receipt: Option<OwnedEventId>,
4704}
4705
4706impl Receipts {
4707 /// Create an empty `Receipts`.
4708 pub fn new() -> Self {
4709 Self::default()
4710 }
4711
4712 /// Set the last event the user has read.
4713 ///
4714 /// It means that the user has read all the events before this event.
4715 ///
4716 /// This is a private marker only visible by the user.
4717 ///
4718 /// Note that this is technically not a receipt as it is persisted in the
4719 /// room account data.
4720 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4721 self.fully_read = event_id.into();
4722 self
4723 }
4724
4725 /// Set the last event presented to the user and forward it to the other
4726 /// users in the room.
4727 ///
4728 /// This is used to reset the unread messages/notification count and
4729 /// advertise to other users the last event that the user has likely seen.
4730 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4731 self.public_read_receipt = event_id.into();
4732 self
4733 }
4734
4735 /// Set the last event presented to the user and don't forward it.
4736 ///
4737 /// This is used to reset the unread messages/notification count.
4738 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4739 self.private_read_receipt = event_id.into();
4740 self
4741 }
4742
4743 /// Whether this `Receipts` is empty.
4744 pub fn is_empty(&self) -> bool {
4745 self.fully_read.is_none()
4746 && self.public_read_receipt.is_none()
4747 && self.private_read_receipt.is_none()
4748 }
4749}
4750
4751/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4752/// listed by a room, possibly validated by checking the space's state.
4753#[derive(Debug)]
4754pub enum ParentSpace {
4755 /// The room recognizes the given room as its parent, and the parent
4756 /// recognizes it as its child.
4757 Reciprocal(Room),
4758 /// The room recognizes the given room as its parent, but the parent does
4759 /// not recognizes it as its child. However, the author of the
4760 /// `m.space.parent` event in the room has a sufficient power level in the
4761 /// parent to create the child event.
4762 WithPowerlevel(Room),
4763 /// The room recognizes the given room as its parent, but the parent does
4764 /// not recognizes it as its child.
4765 Illegitimate(Room),
4766 /// The room recognizes the given id as its parent room, but we cannot check
4767 /// whether the parent recognizes it as its child.
4768 Unverifiable(OwnedRoomId),
4769}
4770
4771trait EventSource {
4772 fn get_event(
4773 &self,
4774 event_id: &EventId,
4775 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4776}
4777
4778impl EventSource for &Room {
4779 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4780 self.load_or_fetch_event(event_id, None).await
4781 }
4782}
4783
4784/// Contains the current user's room member info and the optional room member
4785/// info of the sender of the `m.room.member` event that this info represents.
4786#[derive(Debug)]
4787pub struct RoomMemberWithSenderInfo {
4788 /// The actual room member.
4789 pub room_member: RoomMember,
4790 /// The info of the sender of the event `room_member` is based on, if
4791 /// available.
4792 pub sender_info: Option<RoomMember>,
4793}
4794
4795#[cfg(all(test, not(target_family = "wasm")))]
4796mod tests {
4797 use std::collections::BTreeMap;
4798
4799 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4800 use matrix_sdk_test::{
4801 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4802 };
4803 use ruma::{
4804 RoomVersionId, event_id,
4805 events::{relation::RelationType, room::member::MembershipState},
4806 owned_event_id, room_id, user_id,
4807 };
4808 use wiremock::{
4809 Mock, MockServer, ResponseTemplate,
4810 matchers::{header, method, path_regex},
4811 };
4812
4813 use crate::{
4814 Client,
4815 config::RequestConfig,
4816 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4817 test_utils::{
4818 client::mock_matrix_session,
4819 logged_in_client,
4820 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4821 },
4822 };
4823
4824 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4825 #[async_test]
4826 async fn test_cache_invalidation_while_encrypt() {
4827 use matrix_sdk_base::store::RoomLoadSettings;
4828 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4829
4830 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4831 let session = mock_matrix_session();
4832
4833 let client = Client::builder()
4834 .homeserver_url("http://localhost:1234")
4835 .request_config(RequestConfig::new().disable_retry())
4836 .sqlite_store(&sqlite_path, None)
4837 .build()
4838 .await
4839 .unwrap();
4840 client
4841 .matrix_auth()
4842 .restore_session(session.clone(), RoomLoadSettings::default())
4843 .await
4844 .unwrap();
4845
4846 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4847
4848 // Mock receiving an event to create an internal room.
4849 let server = MockServer::start().await;
4850 {
4851 Mock::given(method("GET"))
4852 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4853 .and(header("authorization", "Bearer 1234"))
4854 .respond_with(
4855 ResponseTemplate::new(200)
4856 .set_body_json(EventFactory::new().room_encryption().into_content()),
4857 )
4858 .mount(&server)
4859 .await;
4860 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4861 let response = SyncResponseBuilder::default()
4862 .add_joined_room(
4863 JoinedRoomBuilder::default()
4864 .add_state_event(
4865 f.member(user_id!("@example:localhost")).display_name("example"),
4866 )
4867 .add_state_event(f.default_power_levels())
4868 .add_state_event(f.room_encryption()),
4869 )
4870 .build_sync_response();
4871 client.base_client().receive_sync_response(response).await.unwrap();
4872 }
4873
4874 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4875
4876 // Step 1, preshare the room keys.
4877 room.preshare_room_key().await.unwrap();
4878
4879 // Step 2, force lock invalidation by pretending another client obtained the
4880 // lock.
4881 {
4882 let client = Client::builder()
4883 .homeserver_url("http://localhost:1234")
4884 .request_config(RequestConfig::new().disable_retry())
4885 .sqlite_store(&sqlite_path, None)
4886 .build()
4887 .await
4888 .unwrap();
4889 client
4890 .matrix_auth()
4891 .restore_session(session.clone(), RoomLoadSettings::default())
4892 .await
4893 .unwrap();
4894 client
4895 .encryption()
4896 .enable_cross_process_store_lock("client2".to_owned())
4897 .await
4898 .unwrap();
4899
4900 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4901 assert!(guard.is_some());
4902 }
4903
4904 // Step 3, take the crypto-store lock.
4905 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4906 assert!(guard.is_some());
4907
4908 // Step 4, try to encrypt a message.
4909 let olm = client.olm_machine().await;
4910 let olm = olm.as_ref().expect("Olm machine wasn't started");
4911
4912 // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4913 // caching the outgoing session before.
4914 let _encrypted_content = olm
4915 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4916 .await
4917 .unwrap();
4918 }
4919
4920 #[async_test]
4921 async fn test_composer_draft() {
4922 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4923
4924 let client = logged_in_client(None).await;
4925
4926 let response = SyncResponseBuilder::default()
4927 .add_joined_room(JoinedRoomBuilder::default())
4928 .build_sync_response();
4929 client.base_client().receive_sync_response(response).await.unwrap();
4930 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4931
4932 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4933
4934 // Save 2 drafts, one for the room and one for a thread.
4935
4936 let draft = ComposerDraft {
4937 plain_text: "Hello, world!".to_owned(),
4938 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4939 draft_type: ComposerDraftType::NewMessage,
4940 attachments: vec![DraftAttachment {
4941 filename: "cat.txt".to_owned(),
4942 content: matrix_sdk_base::DraftAttachmentContent::File {
4943 data: b"meow".to_vec(),
4944 mimetype: Some("text/plain".to_owned()),
4945 size: Some(5),
4946 },
4947 }],
4948 };
4949
4950 room.save_composer_draft(draft.clone(), None).await.unwrap();
4951
4952 let thread_root = owned_event_id!("$thread_root:b.c");
4953 let thread_draft = ComposerDraft {
4954 plain_text: "Hello, thread!".to_owned(),
4955 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4956 draft_type: ComposerDraftType::NewMessage,
4957 attachments: vec![DraftAttachment {
4958 filename: "dog.txt".to_owned(),
4959 content: matrix_sdk_base::DraftAttachmentContent::File {
4960 data: b"wuv".to_vec(),
4961 mimetype: Some("text/plain".to_owned()),
4962 size: Some(4),
4963 },
4964 }],
4965 };
4966
4967 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4968
4969 // Check that the room draft was saved correctly
4970 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4971
4972 // Check that the thread draft was saved correctly
4973 assert_eq!(
4974 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4975 Some(thread_draft.clone())
4976 );
4977
4978 // Clear the room draft
4979 room.clear_composer_draft(None).await.unwrap();
4980 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4981
4982 // Check that the thread one is still there
4983 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4984
4985 // Clear the thread draft as well
4986 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4987 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4988 }
4989
4990 #[async_test]
4991 async fn test_mark_join_requests_as_seen() {
4992 let server = MatrixMockServer::new().await;
4993 let client = server.client_builder().build().await;
4994 let event_id = event_id!("$a:b.c");
4995 let room_id = room_id!("!a:b.c");
4996 let user_id = user_id!("@alice:b.c");
4997
4998 let f = EventFactory::new().room(room_id);
4999 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5000 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
5001 ]);
5002 let room = server.sync_room(&client, joined_room_builder).await;
5003
5004 // When loading the initial seen ids, there are none
5005 let seen_ids =
5006 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5007 assert!(seen_ids.is_empty());
5008
5009 // We mark a random event id as seen
5010 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
5011 .await
5012 .expect("Couldn't mark join request as seen");
5013
5014 // Then we can check it was successfully marked as seen
5015 let seen_ids =
5016 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5017 assert_eq!(seen_ids.len(), 1);
5018 assert_eq!(
5019 seen_ids.into_iter().next().expect("No next value"),
5020 (event_id.to_owned(), user_id.to_owned())
5021 )
5022 }
5023
5024 #[async_test]
5025 async fn test_own_room_membership_with_no_own_member_event() {
5026 let server = MatrixMockServer::new().await;
5027 let client = server.client_builder().build().await;
5028 let room_id = room_id!("!a:b.c");
5029
5030 let room = server.sync_joined_room(&client, room_id).await;
5031
5032 // Since there is no member event for the own user, the method fails.
5033 // This should never happen in an actual room.
5034 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5035 assert!(error.is_some());
5036 }
5037
5038 #[async_test]
5039 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5040 let server = MatrixMockServer::new().await;
5041 let client = server.client_builder().build().await;
5042 let room_id = room_id!("!a:b.c");
5043 let user_id = user_id!("@example:localhost");
5044
5045 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5046 let joined_room_builder =
5047 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5048 let room = server.sync_room(&client, joined_room_builder).await;
5049
5050 // When we load the membership details
5051 let ret = room
5052 .member_with_sender_info(client.user_id().unwrap())
5053 .await
5054 .expect("Room member info should be available");
5055
5056 // We get the member info for the current user
5057 assert_eq!(ret.room_member.event().user_id(), user_id);
5058
5059 // But there is no info for the sender
5060 assert!(ret.sender_info.is_none());
5061 }
5062
5063 #[async_test]
5064 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5065 let server = MatrixMockServer::new().await;
5066 let client = server.client_builder().build().await;
5067 let room_id = room_id!("!a:b.c");
5068 let user_id = user_id!("@example:localhost");
5069
5070 let f = EventFactory::new().room(room_id).sender(user_id);
5071 let joined_room_builder =
5072 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5073 let room = server.sync_room(&client, joined_room_builder).await;
5074
5075 // When we load the membership details
5076 let ret = room
5077 .member_with_sender_info(client.user_id().unwrap())
5078 .await
5079 .expect("Room member info should be available");
5080
5081 // We get the current user's member info
5082 assert_eq!(ret.room_member.event().user_id(), user_id);
5083
5084 // And the sender has the same info, since it's also the current user
5085 assert!(ret.sender_info.is_some());
5086 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5087 }
5088
5089 #[async_test]
5090 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5091 let server = MatrixMockServer::new().await;
5092 let client = server.client_builder().build().await;
5093 let room_id = room_id!("!a:b.c");
5094 let user_id = user_id!("@example:localhost");
5095 let sender_id = user_id!("@alice:b.c");
5096
5097 let f = EventFactory::new().room(room_id).sender(sender_id);
5098 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5099 f.member(user_id).into(),
5100 // The sender info comes from the sync
5101 f.member(sender_id).into(),
5102 ]);
5103 let room = server.sync_room(&client, joined_room_builder).await;
5104
5105 // When we load the membership details
5106 let ret = room
5107 .member_with_sender_info(client.user_id().unwrap())
5108 .await
5109 .expect("Room member info should be available");
5110
5111 // We get the current user's member info
5112 assert_eq!(ret.room_member.event().user_id(), user_id);
5113
5114 // And also the sender info from the events received in the sync
5115 assert!(ret.sender_info.is_some());
5116 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5117 }
5118
5119 #[async_test]
5120 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5121 let server = MatrixMockServer::new().await;
5122 let client = server.client_builder().build().await;
5123 let room_id = room_id!("!a:b.c");
5124 let user_id = user_id!("@example:localhost");
5125 let sender_id = user_id!("@alice:b.c");
5126
5127 let f = EventFactory::new().room(room_id).sender(sender_id);
5128 let joined_room_builder =
5129 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5130 let room = server.sync_room(&client, joined_room_builder).await;
5131
5132 // We'll receive the member info through the /members endpoint
5133 server
5134 .mock_get_members()
5135 .ok(vec![f.member(sender_id).into_raw()])
5136 .mock_once()
5137 .mount()
5138 .await;
5139
5140 // We get the current user's member info
5141 let ret = room
5142 .member_with_sender_info(client.user_id().unwrap())
5143 .await
5144 .expect("Room member info should be available");
5145
5146 // We get the current user's member info
5147 assert_eq!(ret.room_member.event().user_id(), user_id);
5148
5149 // And also the sender info from the /members endpoint
5150 assert!(ret.sender_info.is_some());
5151 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5152 }
5153
5154 #[async_test]
5155 async fn test_list_threads() {
5156 let server = MatrixMockServer::new().await;
5157 let client = server.client_builder().build().await;
5158
5159 let room_id = room_id!("!a:b.c");
5160 let sender_id = user_id!("@alice:b.c");
5161 let f = EventFactory::new().room(room_id).sender(sender_id);
5162
5163 let eid1 = event_id!("$1");
5164 let eid2 = event_id!("$2");
5165 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5166 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5167
5168 server
5169 .mock_room_threads()
5170 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5171 .mock_once()
5172 .mount()
5173 .await;
5174 server
5175 .mock_room_threads()
5176 .match_from("prev_batch")
5177 .ok(batch2, None)
5178 .mock_once()
5179 .mount()
5180 .await;
5181
5182 let room = server.sync_joined_room(&client, room_id).await;
5183 let result =
5184 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5185 assert_eq!(result.chunk.len(), 1);
5186 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5187 assert!(result.prev_batch_token.is_some());
5188
5189 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5190 let result = room.list_threads(opts).await.expect("Failed to list threads");
5191 assert_eq!(result.chunk.len(), 1);
5192 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5193 assert!(result.prev_batch_token.is_none());
5194 }
5195
5196 #[async_test]
5197 async fn test_relations() {
5198 let server = MatrixMockServer::new().await;
5199 let client = server.client_builder().build().await;
5200
5201 let room_id = room_id!("!a:b.c");
5202 let sender_id = user_id!("@alice:b.c");
5203 let f = EventFactory::new().room(room_id).sender(sender_id);
5204
5205 let target_event_id = owned_event_id!("$target");
5206 let eid1 = event_id!("$1");
5207 let eid2 = event_id!("$2");
5208 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5209 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5210
5211 server
5212 .mock_room_relations()
5213 .match_target_event(target_event_id.clone())
5214 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5215 .mock_once()
5216 .mount()
5217 .await;
5218
5219 server
5220 .mock_room_relations()
5221 .match_target_event(target_event_id.clone())
5222 .match_from("next_batch")
5223 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5224 .mock_once()
5225 .mount()
5226 .await;
5227
5228 let room = server.sync_joined_room(&client, room_id).await;
5229
5230 // Main endpoint: no relation type filtered out.
5231 let mut opts = RelationsOptions {
5232 include_relations: IncludeRelations::AllRelations,
5233 ..Default::default()
5234 };
5235 let result = room
5236 .relations(target_event_id.clone(), opts.clone())
5237 .await
5238 .expect("Failed to list relations the first time");
5239 assert_eq!(result.chunk.len(), 1);
5240 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5241 assert!(result.prev_batch_token.is_none());
5242 assert!(result.next_batch_token.is_some());
5243 assert!(result.recursion_depth.is_none());
5244
5245 opts.from = result.next_batch_token;
5246 let result = room
5247 .relations(target_event_id, opts)
5248 .await
5249 .expect("Failed to list relations the second time");
5250 assert_eq!(result.chunk.len(), 1);
5251 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5252 assert!(result.prev_batch_token.is_none());
5253 assert!(result.next_batch_token.is_none());
5254 assert!(result.recursion_depth.is_none());
5255 }
5256
5257 #[async_test]
5258 async fn test_relations_with_reltype() {
5259 let server = MatrixMockServer::new().await;
5260 let client = server.client_builder().build().await;
5261
5262 let room_id = room_id!("!a:b.c");
5263 let sender_id = user_id!("@alice:b.c");
5264 let f = EventFactory::new().room(room_id).sender(sender_id);
5265
5266 let target_event_id = owned_event_id!("$target");
5267 let eid1 = event_id!("$1");
5268 let eid2 = event_id!("$2");
5269 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5270 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5271
5272 server
5273 .mock_room_relations()
5274 .match_target_event(target_event_id.clone())
5275 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5276 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5277 .mock_once()
5278 .mount()
5279 .await;
5280
5281 server
5282 .mock_room_relations()
5283 .match_target_event(target_event_id.clone())
5284 .match_from("next_batch")
5285 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5286 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5287 .mock_once()
5288 .mount()
5289 .await;
5290
5291 let room = server.sync_joined_room(&client, room_id).await;
5292
5293 // Reltype-filtered endpoint, for threads \o/
5294 let mut opts = RelationsOptions {
5295 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5296 ..Default::default()
5297 };
5298 let result = room
5299 .relations(target_event_id.clone(), opts.clone())
5300 .await
5301 .expect("Failed to list relations the first time");
5302 assert_eq!(result.chunk.len(), 1);
5303 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5304 assert!(result.prev_batch_token.is_none());
5305 assert!(result.next_batch_token.is_some());
5306 assert!(result.recursion_depth.is_none());
5307
5308 opts.from = result.next_batch_token;
5309 let result = room
5310 .relations(target_event_id, opts)
5311 .await
5312 .expect("Failed to list relations the second time");
5313 assert_eq!(result.chunk.len(), 1);
5314 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5315 assert!(result.prev_batch_token.is_none());
5316 assert!(result.next_batch_token.is_none());
5317 assert!(result.recursion_depth.is_none());
5318 }
5319
5320 #[async_test]
5321 async fn test_power_levels_computation() {
5322 let server = MatrixMockServer::new().await;
5323 let client = server.client_builder().build().await;
5324
5325 let room_id = room_id!("!a:b.c");
5326 let sender_id = client.user_id().expect("No session id");
5327 let f = EventFactory::new().room(room_id).sender(sender_id);
5328 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5329
5330 // Computing the power levels will need these 3 state events:
5331 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5332 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5333 let room_member_event = f.member(sender_id).into();
5334
5335 // With only the room member event
5336 let room = server
5337 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5338 .await;
5339 let ctx = room
5340 .push_condition_room_ctx()
5341 .await
5342 .expect("Failed to get push condition context")
5343 .expect("Could not get push condition context");
5344
5345 // The internal power levels couldn't be computed
5346 assert!(ctx.power_levels.is_none());
5347
5348 // Adding the room creation event
5349 let room = server
5350 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5351 .await;
5352 let ctx = room
5353 .push_condition_room_ctx()
5354 .await
5355 .expect("Failed to get push condition context")
5356 .expect("Could not get push condition context");
5357
5358 // The internal power levels still couldn't be computed
5359 assert!(ctx.power_levels.is_none());
5360
5361 // With the room member, room creation and the power levels events
5362 let room = server
5363 .sync_room(
5364 &client,
5365 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5366 )
5367 .await;
5368 let ctx = room
5369 .push_condition_room_ctx()
5370 .await
5371 .expect("Failed to get push condition context")
5372 .expect("Could not get push condition context");
5373
5374 // The internal power levels can finally be computed
5375 assert!(ctx.power_levels.is_some());
5376 }
5377}