Skip to main content

matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39    IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43    ComposerDraft, DmRoomDefinition, EncryptionState, RoomInfoNotableUpdateReasons,
44    RoomMemberships, SendOutsideWasm, StateStoreDataKey, StateStoreDataValue,
45    deserialized_responses::{
46        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47    },
48    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49    serde_helpers::extract_relation,
50    store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57    deserialized_responses::TimelineEvent,
58    executor::{JoinHandle, spawn},
59    timeout::timeout,
60};
61use mime::Mime;
62use reply::Reply;
63#[cfg(feature = "e2e-encryption")]
64use ruma::events::AnySyncMessageLikeEvent;
65#[cfg(feature = "experimental-encrypted-state-events")]
66use ruma::events::AnySyncStateEvent;
67#[cfg(feature = "unstable-msc4274")]
68use ruma::events::room::message::GalleryItemType;
69#[cfg(feature = "e2e-encryption")]
70use ruma::events::{
71    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
72};
73use ruma::{
74    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
75    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
76    api::{
77        client::{
78            config::{set_global_account_data, set_room_account_data},
79            context,
80            filter::LazyLoadOptions,
81            membership::{
82                Invite3pid, ban_user, forget_room, get_member_events,
83                invite_user::{
84                    self,
85                    v3::{InvitationRecipient, InviteUserId},
86                },
87                kick_user, leave_room, unban_user,
88            },
89            message::send_message_event,
90            read_marker::set_read_marker,
91            receipt::create_receipt,
92            redact::redact_event,
93            room::{get_room_event, report_content, report_room},
94            state::{get_state_event_for_key, send_state_event},
95            tag::{create_tag, delete_tag},
96            threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
97            typing::create_typing_event::{
98                self,
99                v3::{Typing, TypingInfo},
100            },
101        },
102        error::ErrorKind,
103    },
104    assign,
105    events::{
106        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
107        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
108        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
109        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
110        StaticStateEventContent, SyncStateEvent,
111        beacon::BeaconEventContent,
112        beacon_info::BeaconInfoEventContent,
113        direct::DirectEventContent,
114        marked_unread::MarkedUnreadEventContent,
115        receipt::{Receipt, ReceiptThread, ReceiptType},
116        relation::RelationType,
117        room::{
118            ImageInfo, MediaSource, ThumbnailInfo,
119            avatar::{self, RoomAvatarEventContent},
120            encryption::PossiblyRedactedRoomEncryptionEventContent,
121            history_visibility::HistoryVisibility,
122            member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
123            message::{
124                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
125                ImageMessageEventContent, MessageType, RoomMessageEventContent,
126                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
127                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
128            },
129            name::RoomNameEventContent,
130            pinned_events::RoomPinnedEventsEventContent,
131            power_levels::{
132                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
133            },
134            server_acl::RoomServerAclEventContent,
135            topic::RoomTopicEventContent,
136        },
137        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
138        tag::{TagInfo, TagName},
139        typing::SyncTypingEvent,
140    },
141    int,
142    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
143    serde::Raw,
144    time::Instant,
145    uint,
146};
147#[cfg(feature = "experimental-encrypted-state-events")]
148use ruma::{
149    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
150    serde::JsonCastable,
151};
152use serde::de::DeserializeOwned;
153use thiserror::Error;
154use tokio::{join, sync::broadcast};
155use tracing::{debug, error, info, instrument, trace, warn};
156
157use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
158pub use self::{
159    member::{RoomMember, RoomMemberRole},
160    messages::{
161        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
162        Relations, RelationsOptions, ThreadRoots,
163    },
164};
165#[cfg(feature = "e2e-encryption")]
166use crate::encryption::backups::BackupState;
167#[cfg(doc)]
168use crate::event_cache::EventCache;
169#[cfg(feature = "experimental-encrypted-state-events")]
170use crate::room::futures::{SendRawStateEvent, SendStateEvent};
171use crate::{
172    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
173    attachment::{AttachmentConfig, AttachmentInfo},
174    client::WeakClient,
175    config::RequestConfig,
176    error::{BeaconError, WrongRoomState},
177    event_cache::{self, EventCacheDropHandles, RoomEventCache},
178    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
179    live_locations_observer::LiveLocationsObserver,
180    media::{MediaFormat, MediaRequestParameters},
181    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
182    room::{
183        knock_requests::{KnockRequest, KnockRequestMemberInfo},
184        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
185        privacy_settings::RoomPrivacySettings,
186    },
187    sync::RoomUpdate,
188    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
189};
190
191pub mod edit;
192pub mod futures;
193pub mod identity_status_changes;
194/// Contains code related to requests to join a room.
195pub mod knock_requests;
196mod member;
197mod messages;
198pub mod power_levels;
199pub mod reply;
200
201pub mod calls;
202
203/// Contains all the functionality for modifying the privacy settings in a room.
204pub mod privacy_settings;
205
206#[cfg(feature = "e2e-encryption")]
207pub(crate) mod shared_room_history;
208
209/// A struct containing methods that are common for Joined, Invited and Left
210/// Rooms
211#[derive(Debug, Clone)]
212pub struct Room {
213    inner: BaseRoom,
214    pub(crate) client: Client,
215}
216
217impl Deref for Room {
218    type Target = BaseRoom;
219
220    fn deref(&self) -> &Self::Target {
221        &self.inner
222    }
223}
224
225const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
226const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
227
228/// A thread subscription, according to the semantics of MSC4306.
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub struct ThreadSubscription {
231    /// Whether the subscription was made automatically by a client, not by
232    /// manual user choice.
233    pub automatic: bool,
234}
235
236/// Context allowing to compute the push actions for a given event.
237#[derive(Debug)]
238pub struct PushContext {
239    /// The Ruma context used to compute the push actions.
240    push_condition_room_ctx: PushConditionRoomCtx,
241
242    /// Push rules for this room, based on the push rules state event, or the
243    /// global server default as defined by [`Ruleset::server_default`].
244    push_rules: Ruleset,
245}
246
247impl PushContext {
248    /// Create a new [`PushContext`] from its inner components.
249    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
250        Self { push_condition_room_ctx, push_rules }
251    }
252
253    /// Compute the push rules for a given event.
254    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
256    }
257
258    /// Compute the push rules for a given event, with extra logging to help
259    /// debugging.
260    #[doc(hidden)]
261    #[instrument(skip_all)]
262    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
263        let rules = self
264            .push_rules
265            .iter()
266            .filter_map(|r| {
267                if !r.enabled() {
268                    return None;
269                }
270
271                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
272
273                let conditions = match r {
274                    AnyPushRuleRef::Override(r) => {
275                        format!("{:?}", r.conditions)
276                    }
277                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
278                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
279                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
280                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
281                    _ => "<unknown push rule kind>".to_owned(),
282                };
283
284                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
285            })
286            .collect::<Vec<_>>()
287            .join("\n");
288        trace!("rules:\n\n{rules}\n\n");
289
290        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
291
292        if let Some(found) = found {
293            trace!("rule {} matched", found.rule_id());
294            found.actions().to_owned()
295        } else {
296            trace!("no match");
297            Vec::new()
298        }
299    }
300}
301
302macro_rules! make_media_type {
303    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
304        // If caption is set, use it as body, and filename as the file name; otherwise,
305        // body is the filename, and the filename is not set.
306        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
307        let (body, formatted, filename) = match $caption {
308            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
309            None => ($filename, None, None),
310        };
311
312        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
313
314        match $content_type.type_() {
315            mime::IMAGE => {
316                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
317                    mimetype: Some($content_type.as_ref().to_owned()),
318                    thumbnail_source,
319                    thumbnail_info
320                });
321                let content = assign!(ImageMessageEventContent::new(body, $source), {
322                    info: Some(Box::new(info)),
323                    formatted,
324                    filename
325                });
326                <$t>::Image(content)
327            }
328
329            mime::AUDIO => {
330                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
331                    formatted,
332                    filename
333                });
334
335                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
336                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
337                    let waveform = waveform_vec
338                        .iter()
339                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
340                        .map(Into::into)
341                        .collect();
342                    content.audio =
343                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
344                }
345
346                if matches!($info, Some(AttachmentInfo::Voice(_))) {
347                    content.voice = Some(UnstableVoiceContentBlock::new());
348                }
349
350                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
351                audio_info.mimetype = Some($content_type.as_ref().to_owned());
352                let content = content.info(Box::new(audio_info));
353
354                <$t>::Audio(content)
355            }
356
357            mime::VIDEO => {
358                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
359                    mimetype: Some($content_type.as_ref().to_owned()),
360                    thumbnail_source,
361                    thumbnail_info
362                });
363                let content = assign!(VideoMessageEventContent::new(body, $source), {
364                    info: Some(Box::new(info)),
365                    formatted,
366                    filename
367                });
368                <$t>::Video(content)
369            }
370
371            _ => {
372                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
373                    mimetype: Some($content_type.as_ref().to_owned()),
374                    thumbnail_source,
375                    thumbnail_info
376                });
377                let content = assign!(FileMessageEventContent::new(body, $source), {
378                    info: Some(Box::new(info)),
379                    formatted,
380                    filename,
381                });
382                <$t>::File(content)
383            }
384        }
385    }};
386}
387
388impl Room {
389    /// Create a new `Room`
390    ///
391    /// # Arguments
392    /// * `client` - The client used to make requests.
393    ///
394    /// * `room` - The underlying room.
395    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
396        Self { inner: room, client }
397    }
398
399    /// Leave this room.
400    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
401    /// automatically.
402    ///
403    /// Only invited and joined rooms can be left.
404    #[doc(alias = "reject_invitation")]
405    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
406    async fn leave_impl(&self) -> (Result<()>, &Room) {
407        let state = self.state();
408        if state == RoomState::Left {
409            return (
410                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
411                    "Joined or Invited",
412                    state,
413                )))),
414                self,
415            );
416        }
417
418        // If the room was in Invited state we should also forget it when declining the
419        // invite.
420        let should_forget = matches!(self.state(), RoomState::Invited);
421
422        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
423        let response = self.client.send(request).await;
424
425        // The server can return with an error that is acceptable to ignore. Let's find
426        // which one.
427        if let Err(error) = response {
428            #[allow(clippy::collapsible_match)]
429            let ignore_error = if let Some(error) = error.client_api_error_kind() {
430                match error {
431                    // The user is trying to leave a room but doesn't have permissions to do so.
432                    // Let's consider the user has left the room.
433                    ErrorKind::Forbidden => true,
434                    _ => false,
435                }
436            } else {
437                false
438            };
439
440            error!(?error, ignore_error, should_forget, "Failed to leave the room");
441
442            if !ignore_error {
443                return (Err(error.into()), self);
444            }
445        }
446
447        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
448            return (Err(e.into()), self);
449        }
450
451        if should_forget {
452            trace!("Trying to forget the room");
453
454            if let Err(error) = self.forget().await {
455                error!(?error, "Failed to forget the room");
456            }
457        }
458
459        (Ok(()), self)
460    }
461
462    /// Leave this room and all predecessors.
463    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
464    /// automatically.
465    ///
466    /// Only invited and joined rooms can be left.
467    /// Will return an error if the current room fails to leave but
468    /// will only warn if a predecessor fails to leave.
469    pub async fn leave(&self) -> Result<()> {
470        let mut rooms: Vec<Room> = vec![self.clone()];
471        let mut current_room = self;
472
473        while let Some(predecessor) = current_room.predecessor_room() {
474            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
475
476            if let Some(predecessor_room) = maybe_predecessor_room {
477                rooms.push(predecessor_room.clone());
478                current_room = rooms.last().expect("Room just pushed so can't be empty");
479            } else {
480                warn!("Cannot find predecessor room");
481                break;
482            }
483        }
484
485        let batch_size = 5;
486
487        let rooms_futures: Vec<_> = rooms
488            .iter()
489            .filter_map(|room| match room.state() {
490                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
491                    Some(room.leave_impl())
492                }
493                RoomState::Banned | RoomState::Left => None,
494            })
495            .collect();
496
497        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
498
499        let mut maybe_this_room_failed_with: Option<Error> = None;
500
501        while let Some(result) = futures_stream.next().await {
502            if let (Err(e), room) = result {
503                if room.room_id() == self.room_id() {
504                    maybe_this_room_failed_with = Some(e);
505                } else {
506                    warn!("Failure while attempting to leave predecessor room: {e:?}");
507                }
508            }
509        }
510
511        maybe_this_room_failed_with.map_or(Ok(()), Err)
512    }
513
514    /// Join this room.
515    ///
516    /// Only invited and left rooms can be joined via this method.
517    #[doc(alias = "accept_invitation")]
518    pub async fn join(&self) -> Result<()> {
519        let prev_room_state = self.inner.state();
520
521        if prev_room_state == RoomState::Joined {
522            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
523                "Invited or Left",
524                prev_room_state,
525            ))));
526        }
527
528        self.client.join_room_by_id(self.room_id()).await?;
529
530        Ok(())
531    }
532
533    /// Get the inner client saved in this room instance.
534    ///
535    /// Returns the client this room is part of.
536    pub fn client(&self) -> Client {
537        self.client.clone()
538    }
539
540    /// Get the sync state of this room, i.e. whether it was fully synced with
541    /// the server.
542    pub fn is_synced(&self) -> bool {
543        self.inner.is_state_fully_synced()
544    }
545
546    /// Gets the avatar of this room, if set.
547    ///
548    /// Returns the avatar.
549    /// If a thumbnail is requested no guarantee on the size of the image is
550    /// given.
551    ///
552    /// # Arguments
553    ///
554    /// * `format` - The desired format of the avatar.
555    ///
556    /// # Examples
557    ///
558    /// ```no_run
559    /// # use matrix_sdk::Client;
560    /// # use matrix_sdk::ruma::room_id;
561    /// # use matrix_sdk::media::MediaFormat;
562    /// # use url::Url;
563    /// # let homeserver = Url::parse("http://example.com").unwrap();
564    /// # async {
565    /// # let user = "example";
566    /// let client = Client::new(homeserver).await.unwrap();
567    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
568    /// let room_id = room_id!("!roomid:example.com");
569    /// let room = client.get_room(&room_id).unwrap();
570    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
571    ///     std::fs::write("avatar.png", avatar);
572    /// }
573    /// # };
574    /// ```
575    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
576        let Some(url) = self.avatar_url() else { return Ok(None) };
577        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
578        Ok(Some(self.client.media().get_media_content(&request, true).await?))
579    }
580
581    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
582    /// returns a `Messages` struct that contains a chunk of room and state
583    /// events (`RoomEvent` and `AnyStateEvent`).
584    ///
585    /// With the encryption feature, messages are decrypted if possible. If
586    /// decryption fails for an individual message, that message is returned
587    /// undecrypted.
588    ///
589    /// # Examples
590    ///
591    /// ```no_run
592    /// use matrix_sdk::{Client, room::MessagesOptions};
593    /// # use matrix_sdk::ruma::{
594    /// #     api::client::filter::RoomEventFilter,
595    /// #     room_id,
596    /// # };
597    /// # use url::Url;
598    ///
599    /// # let homeserver = Url::parse("http://example.com").unwrap();
600    /// # async {
601    /// let options =
602    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
603    ///
604    /// let mut client = Client::new(homeserver).await.unwrap();
605    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
606    /// assert!(room.messages(options).await.is_ok());
607    /// # };
608    /// ```
609    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
610    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
611        let room_id = self.inner.room_id();
612        let request = options.into_request(room_id);
613        let http_response = self.client.send(request).await?;
614
615        let push_ctx = self.push_context().await?;
616        let chunk = join_all(
617            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
618        )
619        .await;
620
621        // Save the loaded events into the event cache, if it's set up.
622        if let Ok((cache, _handles)) = self.event_cache().await {
623            cache.save_events(chunk.clone()).await;
624        }
625
626        Ok(Messages {
627            start: http_response.start,
628            end: http_response.end,
629            chunk,
630            state: http_response.state,
631        })
632    }
633
634    /// Register a handler for events of a specific type, within this room.
635    ///
636    /// This method works the same way as [`Client::add_event_handler`], except
637    /// that the handler will only be called for events within this room. See
638    /// that method for more details on event handler functions.
639    ///
640    /// `room.add_event_handler(hdl)` is equivalent to
641    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
642    /// convenient in your use case.
643    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
644    where
645        Ev: SyncEvent + DeserializeOwned + Send + 'static,
646        H: EventHandler<Ev, Ctx>,
647    {
648        self.client.add_room_event_handler(self.room_id(), handler)
649    }
650
651    /// Subscribe to all updates for this room.
652    ///
653    /// The returned receiver will receive a new message for each sync response
654    /// that contains updates for this room.
655    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
656        self.client.subscribe_to_room_updates(self.room_id())
657    }
658
659    /// Subscribe to typing notifications for this room.
660    ///
661    /// The returned receiver will receive a new vector of user IDs for each
662    /// sync response that contains 'm.typing' event. The current user ID will
663    /// be filtered out.
664    pub fn subscribe_to_typing_notifications(
665        &self,
666    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
667        let (sender, receiver) = broadcast::channel(16);
668        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
669            let own_user_id = self.own_user_id().to_owned();
670            move |event: SyncTypingEvent| async move {
671                // Ignore typing notifications from own user.
672                let typing_user_ids = event
673                    .content
674                    .user_ids
675                    .into_iter()
676                    .filter(|user_id| *user_id != own_user_id)
677                    .collect();
678                // Ignore the result. It can only fail if there are no listeners.
679                let _ = sender.send(typing_user_ids);
680            }
681        });
682        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
683        (drop_guard, receiver)
684    }
685
686    /// Subscribe to updates about users who are in "pin violation" i.e. their
687    /// identity has changed and the user has not yet acknowledged this.
688    ///
689    /// The returned receiver will receive a new vector of
690    /// [`IdentityStatusChange`] each time a /keys/query response shows a
691    /// changed identity for a member of this room, or a sync shows a change
692    /// to the membership of an affected user. (Changes to the current user are
693    /// not directly included, but some changes to the current user's identity
694    /// can trigger changes to how we see other users' identities, which
695    /// will be included.)
696    ///
697    /// The first item in the stream provides the current state of the room:
698    /// each member of the room who is not in "pinned" or "verified" state will
699    /// be included (except the current user).
700    ///
701    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
702    /// `PinViolation` then a warning should be displayed to the user. If it is
703    /// set to `Pinned` then no warning should be displayed.
704    ///
705    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
706    /// update is sent, to indicate that the warning should be removed, even
707    /// though the user's identity is not necessarily pinned.
708    #[cfg(feature = "e2e-encryption")]
709    pub async fn subscribe_to_identity_status_changes(
710        &self,
711    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
712        IdentityStatusChanges::create_stream(self.clone()).await
713    }
714
715    /// Subscribes to active live location shares in this room.
716    ///
717    /// Returns a [`LiveLocationsObserver`] that holds the current state and
718    /// exposes a stream of incremental [`eyeball_im::VectorDiff`] updates via
719    /// [`LiveLocationsObserver::subscribe`].
720    ///
721    /// Event handlers are active for as long as the returned struct is alive.
722    pub async fn live_locations_observer(&self) -> LiveLocationsObserver {
723        LiveLocationsObserver::new(self.clone()).await
724    }
725
726    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
727    /// decrypted if needs be.
728    ///
729    /// Only logs from the crypto crate will indicate a failure to decrypt.
730    #[cfg(not(feature = "experimental-encrypted-state-events"))]
731    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
732    async fn try_decrypt_event(
733        &self,
734        event: Raw<AnyTimelineEvent>,
735        push_ctx: Option<&PushContext>,
736    ) -> TimelineEvent {
737        #[cfg(feature = "e2e-encryption")]
738        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
739            SyncMessageLikeEvent::Original(_),
740        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
741            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
742        {
743            return event;
744        }
745
746        let mut event = TimelineEvent::from_plaintext(event.cast());
747        if let Some(push_ctx) = push_ctx {
748            event.set_push_actions(push_ctx.for_event(event.raw()).await);
749        }
750
751        event
752    }
753
754    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
755    /// decrypted if needs be.
756    ///
757    /// Only logs from the crypto crate will indicate a failure to decrypt.
758    #[cfg(feature = "experimental-encrypted-state-events")]
759    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
760    async fn try_decrypt_event(
761        &self,
762        event: Raw<AnyTimelineEvent>,
763        push_ctx: Option<&PushContext>,
764    ) -> TimelineEvent {
765        // If we have either an encrypted message-like or state event, try to decrypt.
766        match event.deserialize_as::<AnySyncTimelineEvent>() {
767            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
768                SyncMessageLikeEvent::Original(_),
769            ))) => {
770                if let Ok(event) = self
771                    .decrypt_event(
772                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
773                        push_ctx,
774                    )
775                    .await
776                {
777                    return event;
778                }
779            }
780            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
781                SyncStateEvent::Original(_),
782            ))) => {
783                if let Ok(event) = self
784                    .decrypt_event(
785                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
786                        push_ctx,
787                    )
788                    .await
789                {
790                    return event;
791                }
792            }
793            _ => {}
794        }
795
796        let mut event = TimelineEvent::from_plaintext(event.cast());
797        if let Some(push_ctx) = push_ctx {
798            event.set_push_actions(push_ctx.for_event(event.raw()).await);
799        }
800
801        event
802    }
803
804    /// Fetch the event with the given `EventId` in this room.
805    ///
806    /// It uses the given [`RequestConfig`] if provided, or the client's default
807    /// one otherwise.
808    pub async fn event(
809        &self,
810        event_id: &EventId,
811        request_config: Option<RequestConfig>,
812    ) -> Result<TimelineEvent> {
813        let request =
814            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
815
816        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
817        let push_ctx = self.push_context().await?;
818        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
819
820        // Save the event into the event cache, if it's set up.
821        if let Ok((cache, _handles)) = self.event_cache().await {
822            cache.save_events([event.clone()]).await;
823        }
824
825        Ok(event)
826    }
827
828    /// Try to load the event from the [`EventCache`][crate::event_cache], if
829    /// it's enabled, or fetch it from the homeserver.
830    ///
831    /// When running the request against the homeserver, it uses the given
832    /// [`RequestConfig`] if provided, or the client's default one
833    /// otherwise.
834    pub async fn load_or_fetch_event(
835        &self,
836        event_id: &EventId,
837        request_config: Option<RequestConfig>,
838    ) -> Result<TimelineEvent> {
839        match self.event_cache().await {
840            Ok((event_cache, _drop_handles)) => {
841                if let Some(event) = event_cache.find_event(event_id).await? {
842                    return Ok(event);
843                }
844                // Fallthrough: try with a request.
845            }
846            Err(err) => {
847                debug!("error when getting the event cache: {err}");
848            }
849        }
850        self.event(event_id, request_config).await
851    }
852
853    /// Try to load the event and its relations from the
854    /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
855    /// from the homeserver.
856    ///
857    /// You can control which types of related events are retrieved using
858    /// `filter`. A `None` value will retrieve any type of related event.
859    ///
860    /// If the event is found in the event cache, but we can't find any
861    /// relations for it there, then we will still attempt to fetch the
862    /// relations from the homeserver.
863    ///
864    /// When running any request against the homeserver, it uses the given
865    /// [`RequestConfig`] if provided, or the client's default one
866    /// otherwise.
867    ///
868    /// Returns a tuple formed of the event and a vector of its relations (that
869    /// can be empty).
870    pub async fn load_or_fetch_event_with_relations(
871        &self,
872        event_id: &EventId,
873        filter: Option<Vec<RelationType>>,
874        request_config: Option<RequestConfig>,
875    ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
876        let fetch_relations = async || {
877            // If there's only a single filter, we can use a more efficient request,
878            // specialized on the filter type.
879            //
880            // Otherwise, we need to get all the relations:
881            // - either because no filters implies we fetch all relations,
882            // - or because there are multiple filters and we must filter out manually.
883            let include_relations = if let Some(filter) = &filter
884                && filter.len() == 1
885            {
886                IncludeRelations::RelationsOfType(filter[0].clone())
887            } else {
888                IncludeRelations::AllRelations
889            };
890
891            let mut opts = RelationsOptions {
892                include_relations,
893                recurse: true,
894                limit: Some(uint!(256)),
895                ..Default::default()
896            };
897
898            let mut events = Vec::new();
899            loop {
900                match self.relations(event_id.to_owned(), opts.clone()).await {
901                    Ok(relations) => {
902                        if let Some(filter) = filter.as_ref() {
903                            // Manually filter out the relation types we're interested in.
904                            events.extend(relations.chunk.into_iter().filter_map(|ev| {
905                                let (rel_type, _) = extract_relation(ev.raw())?;
906                                filter
907                                    .iter()
908                                    .any(|ruma_filter| ruma_filter == &rel_type)
909                                    .then_some(ev)
910                            }));
911                        } else {
912                            // No filter: include all events from the response.
913                            events.extend(relations.chunk);
914                        }
915
916                        if let Some(next_from) = relations.next_batch_token {
917                            opts.from = Some(next_from);
918                        } else {
919                            break events;
920                        }
921                    }
922
923                    Err(err) => {
924                        warn!(%event_id, "error when loading relations of pinned event from server: {err}");
925                        break events;
926                    }
927                }
928            }
929        };
930
931        // First, try to load the event *and* its relations from the event cache, all at
932        // once.
933        let event_cache = match self.event_cache().await {
934            Ok((event_cache, drop_handles)) => {
935                if let Some((event, mut relations)) =
936                    event_cache.find_event_with_relations(event_id, filter.clone()).await?
937                {
938                    if relations.is_empty() {
939                        // The event cache doesn't have any relations for this event, try to fetch
940                        // them from the server instead.
941                        relations = fetch_relations().await;
942                    }
943
944                    return Ok((event, relations));
945                }
946
947                // Otherwise, get the event from the server.
948                Some((event_cache, drop_handles))
949            }
950
951            Err(err) => {
952                debug!("error when getting the event cache: {err}");
953                // Fallthrough: try with a request.
954                None
955            }
956        };
957
958        // Fetch the event from the server. A failure here is fatal, as we must return
959        // the target event.
960        let event = self.event(event_id, request_config).await?;
961
962        // Try to get the relations from the event cache (if we have one).
963        if let Some((event_cache, _drop_handles)) = event_cache
964            && let Some(relations) =
965                event_cache.find_event_relations(event_id, filter.clone()).await.ok()
966            && !relations.is_empty()
967        {
968            return Ok((event, relations));
969        }
970
971        // We couldn't find the relations in the event cache; fetch them from the
972        // server.
973        Ok((event, fetch_relations().await))
974    }
975
976    /// Fetch the event with the given `EventId` in this room, using the
977    /// `/context` endpoint to get more information.
978    pub async fn event_with_context(
979        &self,
980        event_id: &EventId,
981        lazy_load_members: bool,
982        context_size: UInt,
983        request_config: Option<RequestConfig>,
984    ) -> Result<EventWithContextResponse> {
985        let mut request =
986            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
987
988        request.limit = context_size;
989
990        if lazy_load_members {
991            request.filter.lazy_load_options =
992                LazyLoadOptions::Enabled { include_redundant_members: false };
993        }
994
995        let response = self.client.send(request).with_request_config(request_config).await?;
996
997        let push_ctx = self.push_context().await?;
998        let push_ctx = push_ctx.as_ref();
999        let target_event = if let Some(event) = response.event {
1000            Some(self.try_decrypt_event(event, push_ctx).await)
1001        } else {
1002            None
1003        };
1004
1005        // Note: the joined future will fail if any future failed, but
1006        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
1007        // decryption error, so we should prevent against most bad cases here.
1008        let (events_before, events_after) = join!(
1009            join_all(
1010                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1011            ),
1012            join_all(
1013                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1014            ),
1015        );
1016
1017        // Save the loaded events into the event cache, if it's set up.
1018        if let Ok((cache, _handles)) = self.event_cache().await {
1019            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1020            if let Some(event) = &target_event {
1021                events_to_save.push(event.clone());
1022            }
1023
1024            for event in &events_before {
1025                events_to_save.push(event.clone());
1026            }
1027
1028            for event in &events_after {
1029                events_to_save.push(event.clone());
1030            }
1031
1032            cache.save_events(events_to_save).await;
1033        }
1034
1035        Ok(EventWithContextResponse {
1036            event: target_event,
1037            events_before,
1038            events_after,
1039            state: response.state,
1040            prev_batch_token: response.start,
1041            next_batch_token: response.end,
1042        })
1043    }
1044
1045    pub(crate) async fn request_members(&self) -> Result<()> {
1046        self.client
1047            .locks()
1048            .members_request_deduplicated_handler
1049            .run(self.room_id().to_owned(), async move {
1050                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1051                let response = self
1052                    .client
1053                    .send(request.clone())
1054                    .with_request_config(
1055                        // In some cases it can take longer than 30s to load:
1056                        // https://github.com/element-hq/synapse/issues/16872
1057                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1058                    )
1059                    .await?;
1060
1061                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1062                Box::pin(self.client.base_client().receive_all_members(
1063                    self.room_id(),
1064                    &request,
1065                    &response,
1066                ))
1067                .await?;
1068
1069                Ok(())
1070            })
1071            .await
1072    }
1073
1074    /// Request to update the encryption state for this room.
1075    ///
1076    /// It does nothing if the encryption state is already
1077    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1078    pub async fn request_encryption_state(&self) -> Result<()> {
1079        if !self.inner.encryption_state().is_unknown() {
1080            return Ok(());
1081        }
1082
1083        self.client
1084            .locks()
1085            .encryption_state_deduplicated_handler
1086            .run(self.room_id().to_owned(), async move {
1087                // Request the event from the server.
1088                let request = get_state_event_for_key::v3::Request::new(
1089                    self.room_id().to_owned(),
1090                    StateEventType::RoomEncryption,
1091                    "".to_owned(),
1092                );
1093                let response = match self.client.send(request).await {
1094                    Ok(response) => Some(
1095                        response
1096                            .into_content()
1097                            .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1098                            )?,
1099                    ),
1100                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1101                    Err(err) => return Err(err.into()),
1102                };
1103
1104                // Persist the event and the fact that we requested it from the server in
1105                // `RoomInfo`.
1106                self.update_and_save_room_info(|mut room_info| {
1107                    room_info.mark_encryption_state_synced();
1108                    room_info.set_encryption_event(response);
1109                    (room_info, RoomInfoNotableUpdateReasons::empty())
1110                })
1111                .await?;
1112
1113                Ok(())
1114            })
1115            .await
1116    }
1117
1118    /// Check the encryption state of this room.
1119    ///
1120    /// If the result is [`EncryptionState::Unknown`], one might want to call
1121    /// [`Room::request_encryption_state`].
1122    pub fn encryption_state(&self) -> EncryptionState {
1123        self.inner.encryption_state()
1124    }
1125
1126    /// Force to update the encryption state by calling
1127    /// [`Room::request_encryption_state`], and then calling
1128    /// [`Room::encryption_state`].
1129    ///
1130    /// This method is useful to ensure the encryption state is up-to-date.
1131    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1132        self.request_encryption_state().await?;
1133
1134        Ok(self.encryption_state())
1135    }
1136
1137    /// Gets additional context info about the client crypto.
1138    #[cfg(feature = "e2e-encryption")]
1139    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1140        let encryption = self.client.encryption();
1141
1142        let this_device_is_verified = match encryption.get_own_device().await {
1143            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1144
1145            // Should not happen, there will always be an own device
1146            _ => true,
1147        };
1148
1149        let backup_exists_on_server =
1150            encryption.backups().exists_on_server().await.unwrap_or(false);
1151
1152        CryptoContextInfo {
1153            device_creation_ts: encryption.device_creation_timestamp().await,
1154            this_device_is_verified,
1155            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1156            backup_exists_on_server,
1157        }
1158    }
1159
1160    fn are_events_visible(&self) -> bool {
1161        if let RoomState::Invited = self.inner.state() {
1162            return matches!(
1163                self.inner.history_visibility_or_default(),
1164                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1165            );
1166        }
1167
1168        true
1169    }
1170
1171    /// Sync the member list with the server.
1172    ///
1173    /// This method will de-duplicate requests if it is called multiple times in
1174    /// quick succession, in that case the return value will be `None`. This
1175    /// method does nothing if the members are already synced.
1176    pub async fn sync_members(&self) -> Result<()> {
1177        if !self.are_events_visible() {
1178            return Ok(());
1179        }
1180
1181        if !self.are_members_synced() {
1182            self.request_members().await?;
1183
1184            // While we're at it, calculate the active service members
1185            self.update_active_service_members().await?;
1186
1187            Ok(())
1188        } else {
1189            Ok(())
1190        }
1191    }
1192
1193    /// Get a specific member of this room.
1194    ///
1195    /// *Note*: This method will fetch the members from the homeserver if the
1196    /// member list isn't synchronized due to member lazy loading. Because of
1197    /// that it might panic if it isn't run on a tokio thread.
1198    ///
1199    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1200    /// method that doesn't do any requests.
1201    ///
1202    /// # Arguments
1203    ///
1204    /// * `user_id` - The ID of the user that should be fetched out of the
1205    ///   store.
1206    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1207        self.sync_members().await?;
1208        self.get_member_no_sync(user_id).await
1209    }
1210
1211    /// Get a specific member of this room.
1212    ///
1213    /// *Note*: This method will not fetch the members from the homeserver if
1214    /// the member list isn't synchronized due to member lazy loading. Thus,
1215    /// members could be missing.
1216    ///
1217    /// Use [get_member()](#method.get_member) if you want to ensure to always
1218    /// have the full member list to chose from.
1219    ///
1220    /// # Arguments
1221    ///
1222    /// * `user_id` - The ID of the user that should be fetched out of the
1223    ///   store.
1224    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1225        Ok(self
1226            .inner
1227            .get_member(user_id)
1228            .await?
1229            .map(|member| RoomMember::new(self.client.clone(), member)))
1230    }
1231
1232    /// Get members for this room, with the given memberships.
1233    ///
1234    /// *Note*: This method will fetch the members from the homeserver if the
1235    /// member list isn't synchronized due to member lazy loading. Because of
1236    /// that it might panic if it isn't run on a tokio thread.
1237    ///
1238    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1239    /// method that doesn't do any requests.
1240    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1241        self.sync_members().await?;
1242        self.members_no_sync(memberships).await
1243    }
1244
1245    /// Get members for this room, with the given memberships.
1246    ///
1247    /// *Note*: This method will not fetch the members from the homeserver if
1248    /// the member list isn't synchronized due to member lazy loading. Thus,
1249    /// members could be missing.
1250    ///
1251    /// Use [members()](#method.members) if you want to ensure to always get
1252    /// the full member list.
1253    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1254        Ok(self
1255            .inner
1256            .members(memberships)
1257            .await?
1258            .into_iter()
1259            .map(|member| RoomMember::new(self.client.clone(), member))
1260            .collect())
1261    }
1262
1263    /// Sets the display name of the current user within this room.
1264    ///
1265    /// *Note*: This is different to [`crate::Account::set_display_name`] which
1266    /// updates the user's display name across all of their rooms.
1267    pub async fn set_own_member_display_name(
1268        &self,
1269        display_name: Option<String>,
1270    ) -> Result<send_state_event::v3::Response> {
1271        let user_id = self.own_user_id();
1272        let member_event =
1273            self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1274
1275        let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1276            return Err(Error::InsufficientData);
1277        };
1278
1279        let event = raw_event.deserialize()?;
1280
1281        let mut content = match event {
1282            SyncStateEvent::Original(original_event) => original_event.content,
1283            SyncStateEvent::Redacted(redacted_event) => {
1284                RoomMemberEventContent::new(redacted_event.content.membership)
1285            }
1286        };
1287
1288        content.displayname = display_name;
1289        self.send_state_event_for_key(user_id, content).await
1290    }
1291
1292    /// Get all state events of a given type in this room.
1293    pub async fn get_state_events(
1294        &self,
1295        event_type: StateEventType,
1296    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1297        self.client
1298            .state_store()
1299            .get_state_events(self.room_id(), event_type)
1300            .await
1301            .map_err(Into::into)
1302    }
1303
1304    /// Get all state events of a given statically-known type in this room.
1305    ///
1306    /// # Examples
1307    ///
1308    /// ```no_run
1309    /// # async {
1310    /// # let room: matrix_sdk::Room = todo!();
1311    /// use matrix_sdk::ruma::{
1312    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1313    /// };
1314    ///
1315    /// let room_members =
1316    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1317    /// # anyhow::Ok(())
1318    /// # };
1319    /// ```
1320    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1321    where
1322        C: StaticEventContent<IsPrefix = ruma::events::False>
1323            + StaticStateEventContent
1324            + RedactContent,
1325        C::Redacted: RedactedStateEventContent,
1326    {
1327        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1328    }
1329
1330    /// Get the state events of a given type with the given state keys in this
1331    /// room.
1332    pub async fn get_state_events_for_keys(
1333        &self,
1334        event_type: StateEventType,
1335        state_keys: &[&str],
1336    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1337        self.client
1338            .state_store()
1339            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1340            .await
1341            .map_err(Into::into)
1342    }
1343
1344    /// Get the state events of a given statically-known type with the given
1345    /// state keys in this room.
1346    ///
1347    /// # Examples
1348    ///
1349    /// ```no_run
1350    /// # async {
1351    /// # let room: matrix_sdk::Room = todo!();
1352    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1353    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1354    ///
1355    /// let room_members = room
1356    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1357    ///         user_ids,
1358    ///     )
1359    ///     .await?;
1360    /// # anyhow::Ok(())
1361    /// # };
1362    /// ```
1363    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1364        &self,
1365        state_keys: I,
1366    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1367    where
1368        C: StaticEventContent<IsPrefix = ruma::events::False>
1369            + StaticStateEventContent
1370            + RedactContent,
1371        C::StateKey: Borrow<K>,
1372        C::Redacted: RedactedStateEventContent,
1373        K: AsRef<str> + Sized + Sync + 'a,
1374        I: IntoIterator<Item = &'a K> + Send,
1375        I::IntoIter: Send,
1376    {
1377        Ok(self
1378            .client
1379            .state_store()
1380            .get_state_events_for_keys_static(self.room_id(), state_keys)
1381            .await?)
1382    }
1383
1384    /// Get a specific state event in this room.
1385    pub async fn get_state_event(
1386        &self,
1387        event_type: StateEventType,
1388        state_key: &str,
1389    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1390        self.client
1391            .state_store()
1392            .get_state_event(self.room_id(), event_type, state_key)
1393            .await
1394            .map_err(Into::into)
1395    }
1396
1397    /// Get a specific state event of statically-known type with an empty state
1398    /// key in this room.
1399    ///
1400    /// # Examples
1401    ///
1402    /// ```no_run
1403    /// # async {
1404    /// # let room: matrix_sdk::Room = todo!();
1405    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1406    ///
1407    /// let power_levels = room
1408    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1409    ///     .await?
1410    ///     .expect("every room has a power_levels event")
1411    ///     .deserialize()?;
1412    /// # anyhow::Ok(())
1413    /// # };
1414    /// ```
1415    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1416    where
1417        C: StaticEventContent<IsPrefix = ruma::events::False>
1418            + StaticStateEventContent<StateKey = EmptyStateKey>
1419            + RedactContent,
1420        C::Redacted: RedactedStateEventContent,
1421    {
1422        self.get_state_event_static_for_key(&EmptyStateKey).await
1423    }
1424
1425    /// Get a specific state event of statically-known type in this room.
1426    ///
1427    /// # Examples
1428    ///
1429    /// ```no_run
1430    /// # async {
1431    /// # let room: matrix_sdk::Room = todo!();
1432    /// use matrix_sdk::ruma::{
1433    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1434    /// };
1435    ///
1436    /// let member_event = room
1437    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1438    ///         "@alice:example.org"
1439    ///     ))
1440    ///     .await?;
1441    /// # anyhow::Ok(())
1442    /// # };
1443    /// ```
1444    pub async fn get_state_event_static_for_key<C, K>(
1445        &self,
1446        state_key: &K,
1447    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1448    where
1449        C: StaticEventContent<IsPrefix = ruma::events::False>
1450            + StaticStateEventContent
1451            + RedactContent,
1452        C::StateKey: Borrow<K>,
1453        C::Redacted: RedactedStateEventContent,
1454        K: AsRef<str> + ?Sized + Sync,
1455    {
1456        Ok(self
1457            .client
1458            .state_store()
1459            .get_state_event_static_for_key(self.room_id(), state_key)
1460            .await?)
1461    }
1462
1463    /// Returns the parents this room advertises as its parents.
1464    ///
1465    /// Results are in no particular order.
1466    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1467        // Implements this algorithm:
1468        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1469
1470        // Get all m.space.parent events for this room
1471        Ok(self
1472            .get_state_events_static::<SpaceParentEventContent>()
1473            .await?
1474            .into_iter()
1475            // Extract state key (ie. the parent's id) and sender
1476            .filter_map(|parent_event| match parent_event.deserialize() {
1477                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1478                    Some((e.state_key.to_owned(), e.sender))
1479                }
1480                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1481                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1482                Err(e) => {
1483                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1484                    None
1485                }
1486            })
1487            // Check whether the parent recognizes this room as its child
1488            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1489                let Some(parent_room) = self.client.get_room(&state_key) else {
1490                    // We are not in the room, cannot check if the relationship is reciprocal
1491                    // TODO: try peeking into the room
1492                    return Ok(ParentSpace::Unverifiable(state_key));
1493                };
1494                // Get the m.space.child state of the parent with this room's id
1495                // as state key.
1496                if let Some(child_event) = parent_room
1497                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1498                    .await?
1499                {
1500                    match child_event.deserialize() {
1501                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1502                            // There is a valid m.space.child in the parent pointing to
1503                            // this room
1504                            return Ok(ParentSpace::Reciprocal(parent_room));
1505                        }
1506                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1507                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1508                        Err(e) => {
1509                            info!(
1510                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1511                                "Could not deserialize m.space.child: {e}"
1512                            );
1513                        }
1514                    }
1515                    // Otherwise the event is either invalid or redacted. If
1516                    // redacted it would be missing the
1517                    // `via` key, thereby invalidating that end of the
1518                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1519                }
1520
1521                // No reciprocal m.space.child found, let's check if the sender has the
1522                // power to set it
1523                let Some(member) = parent_room.get_member(&sender).await? else {
1524                    // Sender is not even in the parent room
1525                    return Ok(ParentSpace::Illegitimate(parent_room));
1526                };
1527
1528                if member.can_send_state(StateEventType::SpaceChild) {
1529                    // Sender does have the power to set m.room.child
1530                    Ok(ParentSpace::WithPowerlevel(parent_room))
1531                } else {
1532                    Ok(ParentSpace::Illegitimate(parent_room))
1533                }
1534            })
1535            .collect::<FuturesUnordered<_>>())
1536    }
1537
1538    /// Read account data in this room, from storage.
1539    pub async fn account_data(
1540        &self,
1541        data_type: RoomAccountDataEventType,
1542    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1543        self.client
1544            .state_store()
1545            .get_room_account_data_event(self.room_id(), data_type)
1546            .await
1547            .map_err(Into::into)
1548    }
1549
1550    /// Get account data of a statically-known type in this room, from storage.
1551    ///
1552    /// # Examples
1553    ///
1554    /// ```no_run
1555    /// # async {
1556    /// # let room: matrix_sdk::Room = todo!();
1557    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1558    ///
1559    /// match room.account_data_static::<FullyReadEventContent>().await? {
1560    ///     Some(fully_read) => {
1561    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1562    ///     }
1563    ///     None => println!("No read marker for this room"),
1564    /// }
1565    /// # anyhow::Ok(())
1566    /// # };
1567    /// ```
1568    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1569    where
1570        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1571    {
1572        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1573    }
1574
1575    /// Check if all members of this room are verified and all their devices are
1576    /// verified.
1577    ///
1578    /// Returns true if all devices in the room are verified, otherwise false.
1579    #[cfg(feature = "e2e-encryption")]
1580    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1581        let user_ids = self
1582            .client
1583            .state_store()
1584            .get_user_ids(self.room_id(), RoomMemberships::empty())
1585            .await?;
1586
1587        for user_id in user_ids {
1588            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1589            let any_unverified = devices.devices().any(|d| !d.is_verified());
1590
1591            if any_unverified {
1592                return Ok(false);
1593            }
1594        }
1595
1596        Ok(true)
1597    }
1598
1599    /// Set the given account data event for this room.
1600    ///
1601    /// # Example
1602    /// ```
1603    /// # async {
1604    /// # let room: matrix_sdk::Room = todo!();
1605    /// # let event_id: ruma::OwnedEventId = todo!();
1606    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1607    /// let content = FullyReadEventContent::new(event_id);
1608    ///
1609    /// room.set_account_data(content).await?;
1610    /// # anyhow::Ok(())
1611    /// # };
1612    /// ```
1613    pub async fn set_account_data<T>(
1614        &self,
1615        content: T,
1616    ) -> Result<set_room_account_data::v3::Response>
1617    where
1618        T: RoomAccountDataEventContent,
1619    {
1620        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1621
1622        let request = set_room_account_data::v3::Request::new(
1623            own_user.to_owned(),
1624            self.room_id().to_owned(),
1625            &content,
1626        )?;
1627
1628        Ok(self.client.send(request).await?)
1629    }
1630
1631    /// Set the given raw account data event in this room.
1632    ///
1633    /// # Example
1634    /// ```
1635    /// # async {
1636    /// # let room: matrix_sdk::Room = todo!();
1637    /// use matrix_sdk::ruma::{
1638    ///     events::{
1639    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1640    ///         marked_unread::MarkedUnreadEventContent,
1641    ///     },
1642    ///     serde::Raw,
1643    /// };
1644    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1645    /// let full_event: AnyRoomAccountDataEventContent =
1646    ///     marked_unread_content.clone().into();
1647    /// room.set_account_data_raw(
1648    ///     marked_unread_content.event_type(),
1649    ///     Raw::new(&full_event).unwrap(),
1650    /// )
1651    /// .await?;
1652    /// # anyhow::Ok(())
1653    /// # };
1654    /// ```
1655    pub async fn set_account_data_raw(
1656        &self,
1657        event_type: RoomAccountDataEventType,
1658        content: Raw<AnyRoomAccountDataEventContent>,
1659    ) -> Result<set_room_account_data::v3::Response> {
1660        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1661
1662        let request = set_room_account_data::v3::Request::new_raw(
1663            own_user.to_owned(),
1664            self.room_id().to_owned(),
1665            event_type,
1666            content,
1667        );
1668
1669        Ok(self.client.send(request).await?)
1670    }
1671
1672    /// Adds a tag to the room, or updates it if it already exists.
1673    ///
1674    /// Returns the [`create_tag::v3::Response`] from the server.
1675    ///
1676    /// # Arguments
1677    /// * `tag` - The tag to add or update.
1678    ///
1679    /// * `tag_info` - Information about the tag, generally containing the
1680    ///   `order` parameter.
1681    ///
1682    /// # Examples
1683    ///
1684    /// ```no_run
1685    /// # use std::str::FromStr;
1686    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1687    /// # async {
1688    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1689    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1690    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1691    /// use matrix_sdk::ruma::events::tag::TagInfo;
1692    ///
1693    /// if let Some(room) = client.get_room(&room_id) {
1694    ///     let mut tag_info = TagInfo::new();
1695    ///     tag_info.order = Some(0.9);
1696    ///     let user_tag = UserTagName::from_str("u.work")?;
1697    ///
1698    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1699    /// }
1700    /// # anyhow::Ok(()) };
1701    /// ```
1702    pub async fn set_tag(
1703        &self,
1704        tag: TagName,
1705        tag_info: TagInfo,
1706    ) -> Result<create_tag::v3::Response> {
1707        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1708        let request = create_tag::v3::Request::new(
1709            user_id.to_owned(),
1710            self.inner.room_id().to_owned(),
1711            tag.to_string(),
1712            tag_info,
1713        );
1714        Ok(self.client.send(request).await?)
1715    }
1716
1717    /// Removes a tag from the room.
1718    ///
1719    /// Returns the [`delete_tag::v3::Response`] from the server.
1720    ///
1721    /// # Arguments
1722    /// * `tag` - The tag to remove.
1723    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1724        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1725        let request = delete_tag::v3::Request::new(
1726            user_id.to_owned(),
1727            self.inner.room_id().to_owned(),
1728            tag.to_string(),
1729        );
1730        Ok(self.client.send(request).await?)
1731    }
1732
1733    /// Add or remove the `m.favourite` flag for this room.
1734    ///
1735    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1736    /// room, the tag will be removed too.
1737    ///
1738    /// # Arguments
1739    ///
1740    /// * `is_favourite` - Whether to mark this room as favourite.
1741    /// * `tag_order` - The order of the tag if any.
1742    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1743        if is_favourite {
1744            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1745
1746            self.set_tag(TagName::Favorite, tag_info).await?;
1747
1748            if self.is_low_priority() {
1749                self.remove_tag(TagName::LowPriority).await?;
1750            }
1751        } else {
1752            self.remove_tag(TagName::Favorite).await?;
1753        }
1754        Ok(())
1755    }
1756
1757    /// Add or remove the `m.lowpriority` flag for this room.
1758    ///
1759    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1760    /// room, the tag will be removed too.
1761    ///
1762    /// # Arguments
1763    ///
1764    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1765    /// * `tag_order` - The order of the tag if any.
1766    pub async fn set_is_low_priority(
1767        &self,
1768        is_low_priority: bool,
1769        tag_order: Option<f64>,
1770    ) -> Result<()> {
1771        if is_low_priority {
1772            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1773
1774            self.set_tag(TagName::LowPriority, tag_info).await?;
1775
1776            if self.is_favourite() {
1777                self.remove_tag(TagName::Favorite).await?;
1778            }
1779        } else {
1780            self.remove_tag(TagName::LowPriority).await?;
1781        }
1782        Ok(())
1783    }
1784
1785    /// Sets whether this room is a DM.
1786    ///
1787    /// When setting this room as DM, it will be marked as DM for all active
1788    /// members of the room. When unsetting this room as DM, it will be
1789    /// unmarked as DM for all users, not just the members.
1790    ///
1791    /// # Arguments
1792    /// * `is_direct` - Whether to mark this room as direct.
1793    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1794        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1795
1796        let mut content = self
1797            .client
1798            .account()
1799            .account_data::<DirectEventContent>()
1800            .await?
1801            .map(|c| c.deserialize())
1802            .transpose()?
1803            .unwrap_or_default();
1804
1805        let this_room_id = self.inner.room_id();
1806
1807        if is_direct {
1808            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1809            room_members.retain(|member| member.user_id() != self.own_user_id());
1810
1811            for member in room_members {
1812                let entry = content.entry(member.user_id().into()).or_default();
1813                if !entry.iter().any(|room_id| room_id == this_room_id) {
1814                    entry.push(this_room_id.to_owned());
1815                }
1816            }
1817        } else {
1818            for (_, list) in content.iter_mut() {
1819                list.retain(|room_id| *room_id != this_room_id);
1820            }
1821
1822            // Remove user ids that don't have any room marked as DM
1823            content.retain(|_, list| !list.is_empty());
1824        }
1825
1826        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1827
1828        self.client.send(request).await?;
1829        Ok(())
1830    }
1831
1832    /// Tries to decrypt a room event.
1833    ///
1834    /// # Arguments
1835    /// * `event` - The room event to be decrypted.
1836    ///
1837    /// Returns the decrypted event. In the case of a decryption error, returns
1838    /// a `TimelineEvent` representing the decryption error.
1839    #[cfg(feature = "e2e-encryption")]
1840    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1841    pub async fn decrypt_event(
1842        &self,
1843        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1844        push_ctx: Option<&PushContext>,
1845    ) -> Result<TimelineEvent> {
1846        let machine = self.client.olm_machine().await;
1847        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1848
1849        match machine
1850            .try_decrypt_room_event(
1851                event.cast_ref(),
1852                self.inner.room_id(),
1853                self.client.decryption_settings(),
1854            )
1855            .await?
1856        {
1857            RoomEventDecryptionResult::Decrypted(decrypted) => {
1858                let push_actions = if let Some(push_ctx) = push_ctx {
1859                    Some(push_ctx.for_event(&decrypted.event).await)
1860                } else {
1861                    None
1862                };
1863                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1864            }
1865            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1866                self.client
1867                    .encryption()
1868                    .backups()
1869                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1870                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1871            }
1872        }
1873    }
1874
1875    /// Tries to decrypt a room event.
1876    ///
1877    /// # Arguments
1878    /// * `event` - The room event to be decrypted.
1879    ///
1880    /// Returns the decrypted event. In the case of a decryption error, returns
1881    /// a `TimelineEvent` representing the decryption error.
1882    #[cfg(feature = "experimental-encrypted-state-events")]
1883    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1884        &self,
1885        event: &Raw<T>,
1886        push_ctx: Option<&PushContext>,
1887    ) -> Result<TimelineEvent> {
1888        let machine = self.client.olm_machine().await;
1889        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1890
1891        match machine
1892            .try_decrypt_room_event(
1893                event.cast_ref(),
1894                self.inner.room_id(),
1895                self.client.decryption_settings(),
1896            )
1897            .await?
1898        {
1899            RoomEventDecryptionResult::Decrypted(decrypted) => {
1900                let push_actions = if let Some(push_ctx) = push_ctx {
1901                    Some(push_ctx.for_event(&decrypted.event).await)
1902                } else {
1903                    None
1904                };
1905                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1906            }
1907            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1908                self.client
1909                    .encryption()
1910                    .backups()
1911                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1912                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1913                // event.
1914                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1915            }
1916        }
1917    }
1918
1919    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1920    /// session_id.
1921    ///
1922    /// This may be used when we receive an update for a session, and we want to
1923    /// reflect the changes in messages we have received that were encrypted
1924    /// with that session, e.g. to remove a warning shield because a device is
1925    /// now verified.
1926    ///
1927    /// # Arguments
1928    /// * `session_id` - The ID of the Megolm session to get information for.
1929    /// * `sender` - The (claimed) sender of the event where the session was
1930    ///   used.
1931    #[cfg(feature = "e2e-encryption")]
1932    pub async fn get_encryption_info(
1933        &self,
1934        session_id: &str,
1935        sender: &UserId,
1936    ) -> Option<Arc<EncryptionInfo>> {
1937        let machine = self.client.olm_machine().await;
1938        let machine = machine.as_ref()?;
1939        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1940    }
1941
1942    /// Forces the currently active room key, which is used to encrypt messages,
1943    /// to be rotated.
1944    ///
1945    /// A new room key will be crated and shared with all the room members the
1946    /// next time a message will be sent. You don't have to call this method,
1947    /// room keys will be rotated automatically when necessary. This method is
1948    /// still useful for debugging purposes.
1949    ///
1950    /// For more info please take a look a the [`encryption`] module
1951    /// documentation.
1952    ///
1953    /// [`encryption`]: crate::encryption
1954    #[cfg(feature = "e2e-encryption")]
1955    pub async fn discard_room_key(&self) -> Result<()> {
1956        let machine = self.client.olm_machine().await;
1957        if let Some(machine) = machine.as_ref() {
1958            machine.discard_room_key(self.inner.room_id()).await?;
1959            Ok(())
1960        } else {
1961            Err(Error::NoOlmMachine)
1962        }
1963    }
1964
1965    /// Ban the user with `UserId` from this room.
1966    ///
1967    /// # Arguments
1968    ///
1969    /// * `user_id` - The user to ban with `UserId`.
1970    ///
1971    /// * `reason` - The reason for banning this user.
1972    #[instrument(skip_all)]
1973    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1974        let request = assign!(
1975            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1976            { reason: reason.map(ToOwned::to_owned) }
1977        );
1978        self.client.send(request).await?;
1979        Ok(())
1980    }
1981
1982    /// Unban the user with `UserId` from this room.
1983    ///
1984    /// # Arguments
1985    ///
1986    /// * `user_id` - The user to unban with `UserId`.
1987    ///
1988    /// * `reason` - The reason for unbanning this user.
1989    #[instrument(skip_all)]
1990    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1991        let request = assign!(
1992            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1993            { reason: reason.map(ToOwned::to_owned) }
1994        );
1995        self.client.send(request).await?;
1996        Ok(())
1997    }
1998
1999    /// Kick a user out of this room.
2000    ///
2001    /// # Arguments
2002    ///
2003    /// * `user_id` - The `UserId` of the user that should be kicked out of the
2004    ///   room.
2005    ///
2006    /// * `reason` - Optional reason why the room member is being kicked out.
2007    #[instrument(skip_all)]
2008    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
2009        let request = assign!(
2010            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
2011            { reason: reason.map(ToOwned::to_owned) }
2012        );
2013        self.client.send(request).await?;
2014        Ok(())
2015    }
2016
2017    /// Invite the specified user by `UserId` to this room.
2018    ///
2019    /// # Arguments
2020    ///
2021    /// * `user_id` - The `UserId` of the user to invite to the room.
2022    #[instrument(skip_all)]
2023    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2024        #[cfg(feature = "e2e-encryption")]
2025        if self.client.inner.enable_share_history_on_invite {
2026            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2027        }
2028
2029        let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2030        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2031        self.client.send(request).await?;
2032
2033        // Force a future room members reload before sending any event to prevent UTDs
2034        // that can happen when some event is sent after a room member has been invited
2035        // but before the /sync request could fetch the membership change event.
2036        self.mark_members_missing();
2037
2038        Ok(())
2039    }
2040
2041    /// Invite the specified user by third party id to this room.
2042    ///
2043    /// # Arguments
2044    ///
2045    /// * `invite_id` - A third party id of a user to invite to the room.
2046    #[instrument(skip_all)]
2047    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2048        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2049        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2050        self.client.send(request).await?;
2051
2052        // Force a future room members reload before sending any event to prevent UTDs
2053        // that can happen when some event is sent after a room member has been invited
2054        // but before the /sync request could fetch the membership change event.
2055        self.mark_members_missing();
2056
2057        Ok(())
2058    }
2059
2060    /// Activate typing notice for this room.
2061    ///
2062    /// The typing notice remains active for 4s. It can be deactivate at any
2063    /// point by setting typing to `false`. If this method is called while
2064    /// the typing notice is active nothing will happen. This method can be
2065    /// called on every key stroke, since it will do nothing while typing is
2066    /// active.
2067    ///
2068    /// # Arguments
2069    ///
2070    /// * `typing` - Whether the user is typing or has stopped typing.
2071    ///
2072    /// # Examples
2073    ///
2074    /// ```no_run
2075    /// use std::time::Duration;
2076    ///
2077    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2078    /// # use matrix_sdk::{
2079    /// #     Client, config::SyncSettings,
2080    /// #     ruma::room_id,
2081    /// # };
2082    /// # use url::Url;
2083    ///
2084    /// # async {
2085    /// # let homeserver = Url::parse("http://localhost:8080")?;
2086    /// # let client = Client::new(homeserver).await?;
2087    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2088    ///
2089    /// if let Some(room) = client.get_room(&room_id) {
2090    ///     room.typing_notice(true).await?
2091    /// }
2092    /// # anyhow::Ok(()) };
2093    /// ```
2094    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2095        self.ensure_room_joined()?;
2096
2097        // Only send a request to the homeserver if the old timeout has elapsed
2098        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2099        let send = if let Some(typing_time) =
2100            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2101        {
2102            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2103                // We always reactivate the typing notice if typing is true or
2104                // we may need to deactivate it if it's
2105                // currently active if typing is false
2106                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2107            } else {
2108                // Only send a request when we need to deactivate typing
2109                !typing
2110            }
2111        } else {
2112            // Typing notice is currently deactivated, therefore, send a request
2113            // only when it's about to be activated
2114            typing
2115        };
2116
2117        if send {
2118            self.send_typing_notice(typing).await?;
2119        }
2120
2121        Ok(())
2122    }
2123
2124    #[instrument(name = "typing_notice", skip(self))]
2125    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2126        let typing = if typing {
2127            self.client
2128                .inner
2129                .typing_notice_times
2130                .write()
2131                .unwrap()
2132                .insert(self.room_id().to_owned(), Instant::now());
2133            Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2134        } else {
2135            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2136            Typing::No
2137        };
2138
2139        let request = create_typing_event::v3::Request::new(
2140            self.own_user_id().to_owned(),
2141            self.room_id().to_owned(),
2142            typing,
2143        );
2144
2145        self.client.send(request).await?;
2146
2147        Ok(())
2148    }
2149
2150    /// Send a request to set a single receipt.
2151    ///
2152    /// If an unthreaded receipt is sent, this will also unset the unread flag
2153    /// of the room if necessary.
2154    ///
2155    /// # Arguments
2156    ///
2157    /// * `receipt_type` - The type of the receipt to set. Note that it is
2158    ///   possible to set the fully-read marker although it is technically not a
2159    ///   receipt.
2160    ///
2161    /// * `thread` - The thread where this receipt should apply, if any. Note
2162    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
2163    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2164    ///
2165    /// * `event_id` - The `EventId` of the event to set the receipt on.
2166    #[instrument(skip_all)]
2167    pub async fn send_single_receipt(
2168        &self,
2169        receipt_type: create_receipt::v3::ReceiptType,
2170        thread: ReceiptThread,
2171        event_id: OwnedEventId,
2172    ) -> Result<()> {
2173        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2174        // string key.
2175        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2176
2177        self.client
2178            .inner
2179            .locks
2180            .read_receipt_deduplicated_handler
2181            .run((request_key, event_id.clone()), async {
2182                // We will unset the unread flag if we send an unthreaded receipt.
2183                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2184
2185                let mut request = create_receipt::v3::Request::new(
2186                    self.room_id().to_owned(),
2187                    receipt_type,
2188                    event_id,
2189                );
2190                request.thread = thread;
2191
2192                self.client.send(request).await?;
2193
2194                if is_unthreaded {
2195                    self.set_unread_flag(false).await?;
2196                }
2197
2198                Ok(())
2199            })
2200            .await
2201    }
2202
2203    /// Send a request to set multiple receipts at once.
2204    ///
2205    /// This will also unset the unread flag of the room if necessary.
2206    ///
2207    /// # Arguments
2208    ///
2209    /// * `receipts` - The `Receipts` to send.
2210    ///
2211    /// If `receipts` is empty, this is a no-op.
2212    #[instrument(skip_all)]
2213    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2214        if receipts.is_empty() {
2215            return Ok(());
2216        }
2217
2218        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2219        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2220            fully_read,
2221            read_receipt: public_read_receipt,
2222            private_read_receipt,
2223        });
2224
2225        self.client.send(request).await?;
2226
2227        self.set_unread_flag(false).await?;
2228
2229        Ok(())
2230    }
2231
2232    /// Helper function to enable End-to-end encryption in this room.
2233    /// `encrypted_state_events` is not used unless the
2234    /// `experimental-encrypted-state-events` feature is enabled.
2235    #[allow(unused_variables, unused_mut)]
2236    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2237        use ruma::{
2238            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2239        };
2240        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2241
2242        if !self.latest_encryption_state().await?.is_encrypted() {
2243            let mut content =
2244                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2245            #[cfg(feature = "experimental-encrypted-state-events")]
2246            if encrypted_state_events {
2247                content = content.with_encrypted_state();
2248            }
2249            self.send_state_event(content).await?;
2250
2251            // Spin on the sync beat event, since the first sync we receive might not
2252            // include the encryption event.
2253            //
2254            // TODO do we want to return an error here if we time out? This
2255            // could be quite useful if someone wants to enable encryption and
2256            // send a message right after it's enabled.
2257            let res = timeout(
2258                async {
2259                    loop {
2260                        // Listen for sync events, then check if the encryption state is known.
2261                        self.client.inner.sync_beat.listen().await;
2262                        let _state_store_lock =
2263                            self.client.base_client().state_store_lock().lock().await;
2264
2265                        if !self.inner.encryption_state().is_unknown() {
2266                            break;
2267                        }
2268                    }
2269                },
2270                SYNC_WAIT_TIME,
2271            )
2272            .await;
2273
2274            let store_guard = self.client.base_client().state_store_lock().lock().await;
2275
2276            // If encryption was enabled, return.
2277            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2278            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2279                debug!("room successfully marked as encrypted");
2280                return Ok(());
2281            }
2282
2283            // If encryption with state event encryption was enabled, return.
2284            #[cfg(feature = "experimental-encrypted-state-events")]
2285            if res.is_ok() && {
2286                if encrypted_state_events {
2287                    self.inner.encryption_state().is_state_encrypted()
2288                } else {
2289                    self.inner.encryption_state().is_encrypted()
2290                }
2291            } {
2292                debug!("room successfully marked as encrypted");
2293                return Ok(());
2294            }
2295
2296            // If after waiting for multiple syncs, we don't have the encryption state we
2297            // expect, assume the local encryption state is incorrect; this will
2298            // cause the SDK to re-request it later for confirmation, instead of
2299            // assuming it's sync'd and correct (and not encrypted).
2300            debug!("still not marked as encrypted, marking encryption state as missing");
2301
2302            self.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
2303                info.mark_encryption_state_missing();
2304                (info, RoomInfoNotableUpdateReasons::empty())
2305            })
2306            .await?;
2307        }
2308
2309        Ok(())
2310    }
2311
2312    /// Enable End-to-end encryption in this room.
2313    ///
2314    /// This method will be a noop if encryption is already enabled, otherwise
2315    /// sends a `m.room.encryption` state event to the room. This might fail if
2316    /// you don't have the appropriate power level to enable end-to-end
2317    /// encryption.
2318    ///
2319    /// A sync needs to be received to update the local room state. This method
2320    /// will wait for a sync to be received, this might time out if no
2321    /// sync loop is running or if the server is slow.
2322    ///
2323    /// # Examples
2324    ///
2325    /// ```no_run
2326    /// # use matrix_sdk::{
2327    /// #     Client, config::SyncSettings,
2328    /// #     ruma::room_id,
2329    /// # };
2330    /// # use url::Url;
2331    /// #
2332    /// # async {
2333    /// # let homeserver = Url::parse("http://localhost:8080")?;
2334    /// # let client = Client::new(homeserver).await?;
2335    /// # let room_id = room_id!("!test:localhost");
2336    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2337    ///
2338    /// if let Some(room) = client.get_room(&room_id) {
2339    ///     room.enable_encryption().await?
2340    /// }
2341    /// # anyhow::Ok(()) };
2342    /// ```
2343    #[instrument(skip_all)]
2344    pub async fn enable_encryption(&self) -> Result<()> {
2345        self.enable_encryption_inner(false).await
2346    }
2347
2348    /// Enable End-to-end encryption in this room, opting into experimental
2349    /// state event encryption.
2350    ///
2351    /// This method will be a noop if encryption is already enabled, otherwise
2352    /// sends a `m.room.encryption` state event to the room. This might fail if
2353    /// you don't have the appropriate power level to enable end-to-end
2354    /// encryption.
2355    ///
2356    /// A sync needs to be received to update the local room state. This method
2357    /// will wait for a sync to be received, this might time out if no
2358    /// sync loop is running or if the server is slow.
2359    ///
2360    /// # Examples
2361    ///
2362    /// ```no_run
2363    /// # use matrix_sdk::{
2364    /// #     Client, config::SyncSettings,
2365    /// #     ruma::room_id,
2366    /// # };
2367    /// # use url::Url;
2368    /// #
2369    /// # async {
2370    /// # let homeserver = Url::parse("http://localhost:8080")?;
2371    /// # let client = Client::new(homeserver).await?;
2372    /// # let room_id = room_id!("!test:localhost");
2373    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2374    ///
2375    /// if let Some(room) = client.get_room(&room_id) {
2376    ///     room.enable_encryption_with_state_event_encryption().await?
2377    /// }
2378    /// # anyhow::Ok(()) };
2379    /// ```
2380    #[instrument(skip_all)]
2381    #[cfg(feature = "experimental-encrypted-state-events")]
2382    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2383        self.enable_encryption_inner(true).await
2384    }
2385
2386    /// Share a room key with users in the given room.
2387    ///
2388    /// This will create Olm sessions with all the users/device pairs in the
2389    /// room if necessary and share a room key that can be shared with them.
2390    ///
2391    /// Does nothing if no room key needs to be shared.
2392    // TODO: expose this publicly so people can pre-share a group session if
2393    // e.g. a user starts to type a message for a room.
2394    #[cfg(feature = "e2e-encryption")]
2395    #[instrument(skip_all, fields(room_id = ?self.room_id()))]
2396    async fn preshare_room_key(&self) -> Result<()> {
2397        self.ensure_room_joined()?;
2398
2399        // Take and release the lock on the store, if needs be.
2400        let _guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2401
2402        self.client
2403            .locks()
2404            .group_session_deduplicated_handler
2405            .run(self.room_id().to_owned(), async move {
2406                {
2407                    let members = self
2408                        .client
2409                        .state_store()
2410                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2411                        .await?;
2412                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2413                };
2414
2415                let response = self.share_room_key().await;
2416
2417                // If one of the responses failed invalidate the group
2418                // session as using it would end up in undecryptable
2419                // messages.
2420                if let Err(r) = response {
2421                    let machine = self.client.olm_machine().await;
2422                    if let Some(machine) = machine.as_ref() {
2423                        machine.discard_room_key(self.room_id()).await?;
2424                    }
2425                    return Err(r);
2426                }
2427
2428                Ok(())
2429            })
2430            .await
2431    }
2432
2433    /// Share a group session for a room.
2434    ///
2435    /// # Panics
2436    ///
2437    /// Panics if the client isn't logged in.
2438    #[cfg(feature = "e2e-encryption")]
2439    #[instrument(skip_all)]
2440    async fn share_room_key(&self) -> Result<()> {
2441        self.ensure_room_joined()?;
2442
2443        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2444
2445        for request in requests {
2446            let response = self.client.send_to_device(&request).await?;
2447            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2448        }
2449
2450        Ok(())
2451    }
2452
2453    /// Wait for the room to be fully synced.
2454    ///
2455    /// This method makes sure the room that was returned when joining a room
2456    /// has been echoed back in the sync.
2457    ///
2458    /// Warning: This waits until a sync happens and does not return if no sync
2459    /// is happening. It can also return early when the room is not a joined
2460    /// room anymore.
2461    #[instrument(skip_all)]
2462    pub async fn sync_up(&self) {
2463        while !self.is_synced() && self.state() == RoomState::Joined {
2464            let wait_for_beat = self.client.inner.sync_beat.listen();
2465            // We don't care whether it's a timeout or a sync beat.
2466            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2467        }
2468    }
2469
2470    /// Send a message-like event to this room.
2471    ///
2472    /// Returns the parsed response from the server.
2473    ///
2474    /// If the encryption feature is enabled this method will transparently
2475    /// encrypt the event if this room is encrypted (except for `m.reaction`
2476    /// events, which are never encrypted).
2477    ///
2478    /// **Note**: If you just want to send an event with custom JSON content to
2479    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2480    ///
2481    /// If you want to set a transaction ID for the event, use
2482    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2483    /// on the returned value before `.await`ing it.
2484    ///
2485    /// # Arguments
2486    ///
2487    /// * `content` - The content of the message event.
2488    ///
2489    /// # Examples
2490    ///
2491    /// ```no_run
2492    /// # use std::sync::{Arc, RwLock};
2493    /// # use matrix_sdk::{Client, config::SyncSettings};
2494    /// # use url::Url;
2495    /// # use matrix_sdk::ruma::room_id;
2496    /// # use serde::{Deserialize, Serialize};
2497    /// use matrix_sdk::ruma::{
2498    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2499    ///     events::{
2500    ///         macros::EventContent,
2501    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2502    ///     },
2503    ///     uint,
2504    /// };
2505    ///
2506    /// # async {
2507    /// # let homeserver = Url::parse("http://localhost:8080")?;
2508    /// # let mut client = Client::new(homeserver).await?;
2509    /// # let room_id = room_id!("!test:localhost");
2510    /// let content = RoomMessageEventContent::text_plain("Hello world");
2511    /// let txn_id = TransactionId::new();
2512    ///
2513    /// if let Some(room) = client.get_room(&room_id) {
2514    ///     room.send(content).with_transaction_id(txn_id).await?;
2515    /// }
2516    ///
2517    /// // Custom events work too:
2518    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2519    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2520    /// struct TokenEventContent {
2521    ///     token: String,
2522    ///     #[serde(rename = "exp")]
2523    ///     expires_at: MilliSecondsSinceUnixEpoch,
2524    /// }
2525    ///
2526    /// # fn generate_token() -> String { todo!() }
2527    /// let content = TokenEventContent {
2528    ///     token: generate_token(),
2529    ///     expires_at: {
2530    ///         let now = MilliSecondsSinceUnixEpoch::now();
2531    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2532    ///     },
2533    /// };
2534    ///
2535    /// if let Some(room) = client.get_room(&room_id) {
2536    ///     room.send(content).await?;
2537    /// }
2538    /// # anyhow::Ok(()) };
2539    /// ```
2540    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2541        SendMessageLikeEvent::new(self, content)
2542    }
2543
2544    /// Run /keys/query requests for all the non-tracked users, and for users
2545    /// with an out-of-date device list.
2546    #[cfg(feature = "e2e-encryption")]
2547    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2548        let olm = self.client.olm_machine().await;
2549        let olm = olm.as_ref().expect("Olm machine wasn't started");
2550
2551        let members =
2552            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2553
2554        let tracked: HashMap<_, _> = olm
2555            .store()
2556            .load_tracked_users()
2557            .await?
2558            .into_iter()
2559            .map(|tracked| (tracked.user_id, tracked.dirty))
2560            .collect();
2561
2562        // A member has no unknown devices iff it was tracked *and* the tracking is
2563        // not considered dirty.
2564        let members_with_unknown_devices =
2565            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2566
2567        let (req_id, request) =
2568            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2569
2570        if !request.device_keys.is_empty() {
2571            self.client.keys_query(&req_id, request.device_keys).await?;
2572        }
2573
2574        Ok(())
2575    }
2576
2577    /// Send a message-like event with custom JSON content to this room.
2578    ///
2579    /// Returns the parsed response from the server.
2580    ///
2581    /// If the encryption feature is enabled this method will transparently
2582    /// encrypt the event if this room is encrypted (except for `m.reaction`
2583    /// events, which are never encrypted).
2584    ///
2585    /// This method is equivalent to the [`send()`][Self::send] method but
2586    /// allows sending custom JSON payloads, e.g. constructed using the
2587    /// [`serde_json::json!()`] macro.
2588    ///
2589    /// If you want to set a transaction ID for the event, use
2590    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2591    /// on the returned value before `.await`ing it.
2592    ///
2593    /// # Arguments
2594    ///
2595    /// * `event_type` - The type of the event.
2596    ///
2597    /// * `content` - The content of the event as a raw JSON value. The argument
2598    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2599    ///   the full list check the documentation of
2600    ///   [`IntoRawMessageLikeEventContent`].
2601    ///
2602    /// # Examples
2603    ///
2604    /// ```no_run
2605    /// # use std::sync::{Arc, RwLock};
2606    /// # use matrix_sdk::{Client, config::SyncSettings};
2607    /// # use url::Url;
2608    /// # use matrix_sdk::ruma::room_id;
2609    /// # async {
2610    /// # let homeserver = Url::parse("http://localhost:8080")?;
2611    /// # let mut client = Client::new(homeserver).await?;
2612    /// # let room_id = room_id!("!test:localhost");
2613    /// use serde_json::json;
2614    ///
2615    /// if let Some(room) = client.get_room(&room_id) {
2616    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2617    /// }
2618    /// # anyhow::Ok(()) };
2619    /// ```
2620    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2621    pub fn send_raw<'a>(
2622        &'a self,
2623        event_type: &'a str,
2624        content: impl IntoRawMessageLikeEventContent,
2625    ) -> SendRawMessageLikeEvent<'a> {
2626        // Note: the recorded instrument fields are saved in
2627        // `SendRawMessageLikeEvent::into_future`.
2628        SendRawMessageLikeEvent::new(self, event_type, content)
2629    }
2630
2631    /// Send an attachment to this room.
2632    ///
2633    /// This will upload the given data that the reader produces using the
2634    /// [`upload()`] method and post an event to the given room.
2635    /// If the room is encrypted and the encryption feature is enabled the
2636    /// upload will be encrypted.
2637    ///
2638    /// This is a convenience method that calls the
2639    /// [`upload()`] and afterwards the [`send()`].
2640    ///
2641    /// # Arguments
2642    /// * `filename` - The file name.
2643    ///
2644    /// * `content_type` - The type of the media, this will be used as the
2645    /// content-type header.
2646    ///
2647    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2648    /// media.
2649    ///
2650    /// * `config` - Metadata and configuration for the attachment.
2651    ///
2652    /// # Examples
2653    ///
2654    /// ```no_run
2655    /// # use std::fs;
2656    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2657    /// # use url::Url;
2658    /// # use mime;
2659    /// # async {
2660    /// # let homeserver = Url::parse("http://localhost:8080")?;
2661    /// # let mut client = Client::new(homeserver).await?;
2662    /// # let room_id = room_id!("!test:localhost");
2663    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2664    ///
2665    /// if let Some(room) = client.get_room(&room_id) {
2666    ///     room.send_attachment(
2667    ///         "my_favorite_cat.jpg",
2668    ///         &mime::IMAGE_JPEG,
2669    ///         image,
2670    ///         AttachmentConfig::new(),
2671    ///     ).await?;
2672    /// }
2673    /// # anyhow::Ok(()) };
2674    /// ```
2675    ///
2676    /// [`upload()`]: crate::Media::upload
2677    /// [`send()`]: Self::send
2678    #[instrument(skip_all)]
2679    pub fn send_attachment<'a>(
2680        &'a self,
2681        filename: impl Into<String>,
2682        content_type: &'a Mime,
2683        data: Vec<u8>,
2684        config: AttachmentConfig,
2685    ) -> SendAttachment<'a> {
2686        SendAttachment::new(self, filename.into(), content_type, data, config)
2687    }
2688
2689    /// Prepare and send an attachment to this room.
2690    ///
2691    /// This will upload the given data that the reader produces using the
2692    /// [`upload()`](#method.upload) method and post an event to the given room.
2693    /// If the room is encrypted and the encryption feature is enabled the
2694    /// upload will be encrypted.
2695    ///
2696    /// This is a convenience method that calls the
2697    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2698    /// [`send()`](#method.send).
2699    ///
2700    /// # Arguments
2701    /// * `filename` - The file name.
2702    ///
2703    /// * `content_type` - The type of the media, this will be used as the
2704    ///   content-type header.
2705    ///
2706    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2707    ///   media.
2708    ///
2709    /// * `config` - Metadata and configuration for the attachment.
2710    ///
2711    /// * `send_progress` - An observable to transmit forward progress about the
2712    ///   upload.
2713    ///
2714    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2715    ///   be stored in the cache immediately after a successful upload.
2716    #[instrument(skip_all)]
2717    pub(super) async fn prepare_and_send_attachment<'a>(
2718        &'a self,
2719        filename: String,
2720        content_type: &'a Mime,
2721        data: Vec<u8>,
2722        mut config: AttachmentConfig,
2723        send_progress: SharedObservable<TransmissionProgress>,
2724        store_in_cache: bool,
2725    ) -> Result<send_message_event::v3::Response> {
2726        self.ensure_room_joined()?;
2727
2728        let txn_id = config.txn_id.take();
2729        let mentions = config.mentions.take();
2730
2731        let thumbnail = config.thumbnail.take();
2732
2733        // If necessary, store caching data for the thumbnail ahead of time.
2734        let thumbnail_cache_info = if store_in_cache {
2735            thumbnail
2736                .as_ref()
2737                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2738        } else {
2739            None
2740        };
2741
2742        #[cfg(feature = "e2e-encryption")]
2743        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2744            self.client
2745                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2746                .await?
2747        } else {
2748            self.client
2749                .media()
2750                .upload_plain_media_and_thumbnail(
2751                    content_type,
2752                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2753                    // similar.
2754                    data.clone(),
2755                    thumbnail,
2756                    send_progress,
2757                )
2758                .await?
2759        };
2760
2761        #[cfg(not(feature = "e2e-encryption"))]
2762        let (media_source, thumbnail) = self
2763            .client
2764            .media()
2765            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2766            .await?;
2767
2768        if store_in_cache {
2769            let media_store_lock_guard = self.client.media_store().lock().await?;
2770
2771            // A failure to cache shouldn't prevent the whole upload from finishing
2772            // properly, so only log errors during caching.
2773
2774            debug!("caching the media");
2775            let request =
2776                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2777
2778            if let Err(err) = media_store_lock_guard
2779                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2780                .await
2781            {
2782                warn!("unable to cache the media after uploading it: {err}");
2783            }
2784
2785            if let Some(((data, height, width), source)) =
2786                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2787            {
2788                debug!("caching the thumbnail");
2789
2790                let request = MediaRequestParameters {
2791                    source: source.clone(),
2792                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2793                };
2794
2795                if let Err(err) = media_store_lock_guard
2796                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2797                    .await
2798                {
2799                    warn!("unable to cache the media after uploading it: {err}");
2800                }
2801            }
2802        }
2803
2804        let content = self
2805            .make_media_event(
2806                Room::make_attachment_type(
2807                    content_type,
2808                    filename,
2809                    media_source,
2810                    config.caption,
2811                    config.info,
2812                    thumbnail,
2813                ),
2814                mentions,
2815                config.reply,
2816            )
2817            .await?;
2818
2819        let mut fut = self.send(content);
2820        if let Some(txn_id) = txn_id {
2821            fut = fut.with_transaction_id(txn_id);
2822        }
2823
2824        fut.await.map(|result| result.response)
2825    }
2826
2827    /// Creates the inner [`MessageType`] for an already-uploaded media file
2828    /// provided by its source.
2829    #[allow(clippy::too_many_arguments)]
2830    pub(crate) fn make_attachment_type(
2831        content_type: &Mime,
2832        filename: String,
2833        source: MediaSource,
2834        caption: Option<TextMessageEventContent>,
2835        info: Option<AttachmentInfo>,
2836        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2837    ) -> MessageType {
2838        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2839    }
2840
2841    /// Creates the [`RoomMessageEventContent`] based on the message type,
2842    /// mentions and reply information.
2843    pub(crate) async fn make_media_event(
2844        &self,
2845        msg_type: MessageType,
2846        mentions: Option<Mentions>,
2847        reply: Option<Reply>,
2848    ) -> Result<RoomMessageEventContent> {
2849        let mut content = RoomMessageEventContent::new(msg_type);
2850        if let Some(mentions) = mentions {
2851            content = content.add_mentions(mentions);
2852        }
2853        if let Some(reply) = reply {
2854            // Since we just created the event, there is no relation attached to it. Thus,
2855            // it is safe to add the reply relation without overriding anything.
2856            content = self.make_reply_event(content.into(), reply).await?;
2857        }
2858        Ok(content)
2859    }
2860
2861    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2862    /// provided by its source.
2863    #[cfg(feature = "unstable-msc4274")]
2864    #[allow(clippy::too_many_arguments)]
2865    pub(crate) fn make_gallery_item_type(
2866        content_type: &Mime,
2867        filename: String,
2868        source: MediaSource,
2869        caption: Option<TextMessageEventContent>,
2870        info: Option<AttachmentInfo>,
2871        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2872    ) -> GalleryItemType {
2873        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2874    }
2875
2876    /// Update the power levels of a select set of users of this room.
2877    ///
2878    /// Issue a `power_levels` state event request to the server, changing the
2879    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2880    /// locally known yet or the server rejects the state event update, e.g.
2881    /// because of insufficient permissions. Neither permissions to update
2882    /// nor whether the data might be stale is checked prior to issuing the
2883    /// request.
2884    pub async fn update_power_levels(
2885        &self,
2886        updates: Vec<(&UserId, Int)>,
2887    ) -> Result<send_state_event::v3::Response> {
2888        let mut power_levels = self.power_levels().await?;
2889
2890        for (user_id, new_level) in updates {
2891            if new_level == power_levels.users_default {
2892                power_levels.users.remove(user_id);
2893            } else {
2894                power_levels.users.insert(user_id.to_owned(), new_level);
2895            }
2896        }
2897
2898        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2899    }
2900
2901    /// Applies a set of power level changes to this room.
2902    ///
2903    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2904    /// remain unchanged.
2905    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2906        let mut power_levels = self.power_levels().await?;
2907        power_levels.apply(changes)?;
2908        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2909        Ok(())
2910    }
2911
2912    /// Resets the room's power levels to the default values
2913    ///
2914    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2915    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2916        let creators = self.creators().unwrap_or_default();
2917        let rules = self.clone_info().room_version_rules_or_default();
2918
2919        let default_power_levels =
2920            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2921        let changes = RoomPowerLevelChanges::from(default_power_levels);
2922        self.apply_power_level_changes(changes).await?;
2923        Ok(self.power_levels().await?)
2924    }
2925
2926    /// Gets the suggested role for the user with the provided `user_id`.
2927    ///
2928    /// This method checks the `RoomPowerLevels` events instead of loading the
2929    /// member list and looking for the member.
2930    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2931        let power_level = self.get_user_power_level(user_id).await?;
2932        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2933    }
2934
2935    /// Gets the power level the user with the provided `user_id`.
2936    ///
2937    /// This method checks the `RoomPowerLevels` events instead of loading the
2938    /// member list and looking for the member.
2939    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2940        let event = self.power_levels().await?;
2941        Ok(event.for_user(user_id))
2942    }
2943
2944    /// Gets a map with the `UserId` of users with power levels other than `0`
2945    /// and this power level.
2946    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2947        let power_levels = self.power_levels().await.ok();
2948        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2949        if let Some(power_levels) = power_levels {
2950            for (id, level) in power_levels.users.into_iter() {
2951                user_power_levels.insert(id, level.into());
2952            }
2953        }
2954        user_power_levels
2955    }
2956
2957    /// Sets the name of this room.
2958    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2959        self.send_state_event(RoomNameEventContent::new(name)).await
2960    }
2961
2962    /// Sets a new topic for this room.
2963    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2964        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2965    }
2966
2967    /// Sets the new avatar url for this room.
2968    ///
2969    /// # Arguments
2970    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2971    /// * `info` - The optional image info that can be provided for the avatar
2972    pub async fn set_avatar_url(
2973        &self,
2974        url: &MxcUri,
2975        info: Option<avatar::ImageInfo>,
2976    ) -> Result<send_state_event::v3::Response> {
2977        self.ensure_room_joined()?;
2978
2979        let mut room_avatar_event = RoomAvatarEventContent::new();
2980        room_avatar_event.url = Some(url.to_owned());
2981        room_avatar_event.info = info.map(Box::new);
2982
2983        self.send_state_event(room_avatar_event).await
2984    }
2985
2986    /// Removes the avatar from the room
2987    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2988        self.send_state_event(RoomAvatarEventContent::new()).await
2989    }
2990
2991    /// Uploads a new avatar for this room.
2992    ///
2993    /// # Arguments
2994    /// * `mime` - The mime type describing the data
2995    /// * `data` - The data representation of the avatar
2996    /// * `info` - The optional image info provided for the avatar, the blurhash
2997    ///   and the mimetype will always be updated
2998    pub async fn upload_avatar(
2999        &self,
3000        mime: &Mime,
3001        data: Vec<u8>,
3002        info: Option<avatar::ImageInfo>,
3003    ) -> Result<send_state_event::v3::Response> {
3004        self.ensure_room_joined()?;
3005
3006        let upload_response = self.client.media().upload(mime, data, None).await?;
3007        let mut info = info.unwrap_or_default();
3008        info.blurhash = upload_response.blurhash;
3009        info.mimetype = Some(mime.to_string());
3010
3011        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3012    }
3013
3014    /// Send a state event with an empty state key to the homeserver.
3015    ///
3016    /// For state events with a non-empty state key, see
3017    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3018    ///
3019    /// Returns the parsed response from the server.
3020    ///
3021    /// # Arguments
3022    ///
3023    /// * `content` - The content of the state event.
3024    ///
3025    /// # Examples
3026    ///
3027    /// ```no_run
3028    /// # use serde::{Deserialize, Serialize};
3029    /// # async {
3030    /// # let joined_room: matrix_sdk::Room = todo!();
3031    /// use matrix_sdk::ruma::{
3032    ///     EventEncryptionAlgorithm,
3033    ///     events::{
3034    ///         EmptyStateKey, macros::EventContent,
3035    ///         room::encryption::RoomEncryptionEventContent,
3036    ///     },
3037    /// };
3038    ///
3039    /// let encryption_event_content = RoomEncryptionEventContent::new(
3040    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3041    /// );
3042    /// joined_room.send_state_event(encryption_event_content).await?;
3043    ///
3044    /// // Custom event:
3045    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3046    /// #[ruma_event(
3047    ///     type = "org.matrix.msc_9000.xxx",
3048    ///     kind = State,
3049    ///     state_key_type = EmptyStateKey,
3050    /// )]
3051    /// struct XxxStateEventContent {/* fields... */}
3052    ///
3053    /// let content: XxxStateEventContent = todo!();
3054    /// joined_room.send_state_event(content).await?;
3055    /// # anyhow::Ok(()) };
3056    /// ```
3057    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3058    #[instrument(skip_all)]
3059    pub async fn send_state_event(
3060        &self,
3061        content: impl StateEventContent<StateKey = EmptyStateKey>,
3062    ) -> Result<send_state_event::v3::Response> {
3063        self.send_state_event_for_key(&EmptyStateKey, content).await
3064    }
3065
3066    /// Send a state event with an empty state key to the homeserver.
3067    ///
3068    /// For state events with a non-empty state key, see
3069    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3070    ///
3071    /// If the experimental state event encryption feature is enabled, this
3072    /// method will transparently encrypt the event if this room is
3073    /// encrypted (except if the event type is considered critical for the room
3074    /// to function, as outlined in [MSC4362][msc4362]).
3075    ///
3076    /// Returns the parsed response from the server.
3077    ///
3078    /// # Arguments
3079    ///
3080    /// * `content` - The content of the state event.
3081    ///
3082    /// # Examples
3083    ///
3084    /// ```no_run
3085    /// # use serde::{Deserialize, Serialize};
3086    /// # async {
3087    /// # let joined_room: matrix_sdk::Room = todo!();
3088    /// use matrix_sdk::ruma::{
3089    ///     EventEncryptionAlgorithm,
3090    ///     events::{
3091    ///         EmptyStateKey, macros::EventContent,
3092    ///         room::encryption::RoomEncryptionEventContent,
3093    ///     },
3094    /// };
3095    ///
3096    /// let encryption_event_content = RoomEncryptionEventContent::new(
3097    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3098    /// );
3099    /// joined_room.send_state_event(encryption_event_content).await?;
3100    ///
3101    /// // Custom event:
3102    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3103    /// #[ruma_event(
3104    ///     type = "org.matrix.msc_9000.xxx",
3105    ///     kind = State,
3106    ///     state_key_type = EmptyStateKey,
3107    /// )]
3108    /// struct XxxStateEventContent {/* fields... */}
3109    ///
3110    /// let content: XxxStateEventContent = todo!();
3111    /// joined_room.send_state_event(content).await?;
3112    /// # anyhow::Ok(()) };
3113    /// ```
3114    ///
3115    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3116    #[cfg(feature = "experimental-encrypted-state-events")]
3117    #[instrument(skip_all)]
3118    pub fn send_state_event<'a>(
3119        &'a self,
3120        content: impl StateEventContent<StateKey = EmptyStateKey>,
3121    ) -> SendStateEvent<'a> {
3122        self.send_state_event_for_key(&EmptyStateKey, content)
3123    }
3124
3125    /// Send a state event to the homeserver.
3126    ///
3127    /// Returns the parsed response from the server.
3128    ///
3129    /// # Arguments
3130    ///
3131    /// * `content` - The content of the state event.
3132    ///
3133    /// * `state_key` - A unique key which defines the overwriting semantics for
3134    ///   this piece of room state.
3135    ///
3136    /// # Examples
3137    ///
3138    /// ```no_run
3139    /// # use serde::{Deserialize, Serialize};
3140    /// # async {
3141    /// # let joined_room: matrix_sdk::Room = todo!();
3142    /// use matrix_sdk::ruma::{
3143    ///     events::{
3144    ///         macros::EventContent,
3145    ///         room::member::{RoomMemberEventContent, MembershipState},
3146    ///     },
3147    ///     mxc_uri,
3148    /// };
3149    ///
3150    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3151    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3152    /// content.avatar_url = Some(avatar_url);
3153    ///
3154    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3155    ///
3156    /// // Custom event:
3157    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3158    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3159    /// struct XxxStateEventContent { /* fields... */ }
3160    ///
3161    /// let content: XxxStateEventContent = todo!();
3162    /// joined_room.send_state_event_for_key("foo", content).await?;
3163    /// # anyhow::Ok(()) };
3164    /// ```
3165    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3166    pub async fn send_state_event_for_key<C, K>(
3167        &self,
3168        state_key: &K,
3169        content: C,
3170    ) -> Result<send_state_event::v3::Response>
3171    where
3172        C: StateEventContent,
3173        C::StateKey: Borrow<K>,
3174        K: AsRef<str> + ?Sized,
3175    {
3176        self.ensure_room_joined()?;
3177        let request =
3178            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3179        let response = self.client.send(request).await?;
3180        Ok(response)
3181    }
3182
3183    /// Send a state event to the homeserver. If state encryption is enabled in
3184    /// this room, the event will be encrypted.
3185    ///
3186    /// If the experimental state event encryption feature is enabled, this
3187    /// method will transparently encrypt the event if this room is
3188    /// encrypted (except if the event type is considered critical for the room
3189    /// to function, as outlined in [MSC4362][msc4362]).
3190    ///
3191    /// Returns the parsed response from the server.
3192    ///
3193    /// # Arguments
3194    ///
3195    /// * `content` - The content of the state event.
3196    ///
3197    /// * `state_key` - A unique key which defines the overwriting semantics for
3198    ///   this piece of room state.
3199    ///
3200    /// # Examples
3201    ///
3202    /// ```no_run
3203    /// # use serde::{Deserialize, Serialize};
3204    /// # async {
3205    /// # let joined_room: matrix_sdk::Room = todo!();
3206    /// use matrix_sdk::ruma::{
3207    ///     events::{
3208    ///         macros::EventContent,
3209    ///         room::member::{RoomMemberEventContent, MembershipState},
3210    ///     },
3211    ///     mxc_uri,
3212    /// };
3213    ///
3214    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3215    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3216    /// content.avatar_url = Some(avatar_url);
3217    ///
3218    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3219    ///
3220    /// // Custom event:
3221    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3222    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3223    /// struct XxxStateEventContent { /* fields... */ }
3224    ///
3225    /// let content: XxxStateEventContent = todo!();
3226    /// joined_room.send_state_event_for_key("foo", content).await?;
3227    /// # anyhow::Ok(()) };
3228    /// ```
3229    ///
3230    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3231    #[cfg(feature = "experimental-encrypted-state-events")]
3232    pub fn send_state_event_for_key<'a, C, K>(
3233        &'a self,
3234        state_key: &K,
3235        content: C,
3236    ) -> SendStateEvent<'a>
3237    where
3238        C: StateEventContent,
3239        C::StateKey: Borrow<K>,
3240        K: AsRef<str> + ?Sized,
3241    {
3242        SendStateEvent::new(self, state_key, content)
3243    }
3244
3245    /// Send a raw room state event to the homeserver.
3246    ///
3247    /// Returns the parsed response from the server.
3248    ///
3249    /// # Arguments
3250    ///
3251    /// * `event_type` - The type of the event that we're sending out.
3252    ///
3253    /// * `state_key` - A unique key which defines the overwriting semantics for
3254    /// this piece of room state. This value is often a zero-length string.
3255    ///
3256    /// * `content` - The content of the event as a raw JSON value. The argument
3257    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3258    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3259    ///
3260    /// # Examples
3261    ///
3262    /// ```no_run
3263    /// use serde_json::json;
3264    ///
3265    /// # async {
3266    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3267    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3268    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3269    ///
3270    /// if let Some(room) = client.get_room(&room_id) {
3271    ///     room.send_state_event_raw("m.room.member", "", json!({
3272    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3273    ///         "displayname": "Alice Margatroid",
3274    ///         "membership": "join",
3275    ///     })).await?;
3276    /// }
3277    /// # anyhow::Ok(()) };
3278    /// ```
3279    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3280    #[instrument(skip_all)]
3281    pub async fn send_state_event_raw(
3282        &self,
3283        event_type: &str,
3284        state_key: &str,
3285        content: impl IntoRawStateEventContent,
3286    ) -> Result<send_state_event::v3::Response> {
3287        self.ensure_room_joined()?;
3288
3289        let request = send_state_event::v3::Request::new_raw(
3290            self.room_id().to_owned(),
3291            event_type.into(),
3292            state_key.to_owned(),
3293            content.into_raw_state_event_content(),
3294        );
3295
3296        Ok(self.client.send(request).await?)
3297    }
3298
3299    /// Send a raw room state event to the homeserver.
3300    ///
3301    /// If the experimental state event encryption feature is enabled, this
3302    /// method will transparently encrypt the event if this room is
3303    /// encrypted (except if the event type is considered critical for the room
3304    /// to function, as outlined in [MSC4362][msc4362]).
3305    ///
3306    /// Returns the parsed response from the server.
3307    ///
3308    /// # Arguments
3309    ///
3310    /// * `event_type` - The type of the event that we're sending out.
3311    ///
3312    /// * `state_key` - A unique key which defines the overwriting semantics for
3313    /// this piece of room state. This value is often a zero-length string.
3314    ///
3315    /// * `content` - The content of the event as a raw JSON value. The argument
3316    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3317    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3318    ///
3319    /// # Examples
3320    ///
3321    /// ```no_run
3322    /// use serde_json::json;
3323    ///
3324    /// # async {
3325    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3326    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3327    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3328    ///
3329    /// if let Some(room) = client.get_room(&room_id) {
3330    ///     room.send_state_event_raw("m.room.member", "", json!({
3331    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3332    ///         "displayname": "Alice Margatroid",
3333    ///         "membership": "join",
3334    ///     })).await?;
3335    /// }
3336    /// # anyhow::Ok(()) };
3337    /// ```
3338    ///
3339    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3340    #[cfg(feature = "experimental-encrypted-state-events")]
3341    #[instrument(skip_all)]
3342    pub fn send_state_event_raw<'a>(
3343        &'a self,
3344        event_type: &'a str,
3345        state_key: &'a str,
3346        content: impl IntoRawStateEventContent,
3347    ) -> SendRawStateEvent<'a> {
3348        SendRawStateEvent::new(self, event_type, state_key, content)
3349    }
3350
3351    /// Strips all information out of an event of the room.
3352    ///
3353    /// Returns the [`redact_event::v3::Response`] from the server.
3354    ///
3355    /// This cannot be undone. Users may redact their own events, and any user
3356    /// with a power level greater than or equal to the redact power level of
3357    /// the room may redact events there.
3358    ///
3359    /// # Arguments
3360    ///
3361    /// * `event_id` - The ID of the event to redact
3362    ///
3363    /// * `reason` - The reason for the event being redacted.
3364    ///
3365    /// * `txn_id` - A unique ID that can be attached to this event as
3366    /// its transaction ID. If not given one is created for the message.
3367    ///
3368    /// # Examples
3369    ///
3370    /// ```no_run
3371    /// use matrix_sdk::ruma::event_id;
3372    ///
3373    /// # async {
3374    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3375    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3376    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3377    /// #
3378    /// if let Some(room) = client.get_room(&room_id) {
3379    ///     let event_id = event_id!("$xxxxxx:example.org");
3380    ///     let reason = Some("Indecent material");
3381    ///     room.redact(&event_id, reason, None).await?;
3382    /// }
3383    /// # anyhow::Ok(()) };
3384    /// ```
3385    #[instrument(skip_all)]
3386    pub async fn redact(
3387        &self,
3388        event_id: &EventId,
3389        reason: Option<&str>,
3390        txn_id: Option<OwnedTransactionId>,
3391    ) -> HttpResult<redact_event::v3::Response> {
3392        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3393        let request = assign!(
3394            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3395            { reason: reason.map(ToOwned::to_owned) }
3396        );
3397
3398        self.client.send(request).await
3399    }
3400
3401    /// Get a list of servers that should know this room.
3402    ///
3403    /// Uses the synced members of the room and the suggested [routing
3404    /// algorithm] from the Matrix spec.
3405    ///
3406    /// Returns at most three servers.
3407    ///
3408    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3409    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3410        let acl_ev = self
3411            .get_state_event_static::<RoomServerAclEventContent>()
3412            .await?
3413            .and_then(|ev| ev.deserialize().ok());
3414        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3415            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3416            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3417        });
3418
3419        // Filter out server names that:
3420        // - Are blocked due to server ACLs
3421        // - Are IP addresses
3422        let members: Vec<_> = self
3423            .members_no_sync(RoomMemberships::JOIN)
3424            .await?
3425            .into_iter()
3426            .filter(|member| {
3427                let server = member.user_id().server_name();
3428                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3429            })
3430            .collect();
3431
3432        // Get the server of the highest power level user in the room, provided
3433        // they are at least power level 50.
3434        let max = members
3435            .iter()
3436            .max_by_key(|member| member.power_level())
3437            .filter(|max| max.power_level() >= int!(50))
3438            .map(|member| member.user_id().server_name());
3439
3440        // Sort the servers by population.
3441        let servers = members
3442            .iter()
3443            .map(|member| member.user_id().server_name())
3444            .filter(|server| max.filter(|max| max == server).is_none())
3445            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3446                *servers.entry(server).or_default() += 1;
3447                servers
3448            });
3449        let mut servers: Vec<_> = servers.into_iter().collect();
3450        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3451
3452        Ok(max
3453            .into_iter()
3454            .chain(servers.into_iter().map(|(name, _)| name))
3455            .take(3)
3456            .map(ToOwned::to_owned)
3457            .collect())
3458    }
3459
3460    /// Get a `matrix.to` permalink to this room.
3461    ///
3462    /// If this room has an alias, we use it. Otherwise, we try to use the
3463    /// synced members in the room for [routing] the room ID.
3464    ///
3465    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3466    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3467        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3468            return Ok(alias.matrix_to_uri());
3469        }
3470
3471        let via = self.route().await?;
3472        Ok(self.room_id().matrix_to_uri_via(via))
3473    }
3474
3475    /// Get a `matrix:` permalink to this room.
3476    ///
3477    /// If this room has an alias, we use it. Otherwise, we try to use the
3478    /// synced members in the room for [routing] the room ID.
3479    ///
3480    /// # Arguments
3481    ///
3482    /// * `join` - Whether the user should join the room.
3483    ///
3484    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3485    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3486        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3487            return Ok(alias.matrix_uri(join));
3488        }
3489
3490        let via = self.route().await?;
3491        Ok(self.room_id().matrix_uri_via(via, join))
3492    }
3493
3494    /// Get a `matrix.to` permalink to an event in this room.
3495    ///
3496    /// We try to use the synced members in the room for [routing] the room ID.
3497    ///
3498    /// *Note*: This method does not check if the given event ID is actually
3499    /// part of this room. It needs to be checked before calling this method
3500    /// otherwise the permalink won't work.
3501    ///
3502    /// # Arguments
3503    ///
3504    /// * `event_id` - The ID of the event.
3505    ///
3506    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3507    pub async fn matrix_to_event_permalink(
3508        &self,
3509        event_id: impl Into<OwnedEventId>,
3510    ) -> Result<MatrixToUri> {
3511        // Don't use the alias because an event is tied to a room ID, but an
3512        // alias might point to another room, e.g. after a room upgrade.
3513        let via = self.route().await?;
3514        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3515    }
3516
3517    /// Get a `matrix:` permalink to an event in this room.
3518    ///
3519    /// We try to use the synced members in the room for [routing] the room ID.
3520    ///
3521    /// *Note*: This method does not check if the given event ID is actually
3522    /// part of this room. It needs to be checked before calling this method
3523    /// otherwise the permalink won't work.
3524    ///
3525    /// # Arguments
3526    ///
3527    /// * `event_id` - The ID of the event.
3528    ///
3529    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3530    pub async fn matrix_event_permalink(
3531        &self,
3532        event_id: impl Into<OwnedEventId>,
3533    ) -> Result<MatrixUri> {
3534        // Don't use the alias because an event is tied to a room ID, but an
3535        // alias might point to another room, e.g. after a room upgrade.
3536        let via = self.route().await?;
3537        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3538    }
3539
3540    /// Get the latest receipt of a user in this room.
3541    ///
3542    /// # Arguments
3543    ///
3544    /// * `receipt_type` - The type of receipt to get.
3545    ///
3546    /// * `thread` - The thread containing the event of the receipt, if any.
3547    ///
3548    /// * `user_id` - The ID of the user.
3549    ///
3550    /// Returns the ID of the event on which the receipt applies and the
3551    /// receipt.
3552    pub async fn load_user_receipt(
3553        &self,
3554        receipt_type: ReceiptType,
3555        thread: ReceiptThread,
3556        user_id: &UserId,
3557    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3558        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3559    }
3560
3561    /// Load the receipts for an event in this room from storage.
3562    ///
3563    /// # Arguments
3564    ///
3565    /// * `receipt_type` - The type of receipt to get.
3566    ///
3567    /// * `thread` - The thread containing the event of the receipt, if any.
3568    ///
3569    /// * `event_id` - The ID of the event.
3570    ///
3571    /// Returns a list of IDs of users who have sent a receipt for the event and
3572    /// the corresponding receipts.
3573    pub async fn load_event_receipts(
3574        &self,
3575        receipt_type: ReceiptType,
3576        thread: ReceiptThread,
3577        event_id: &EventId,
3578    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3579        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3580    }
3581
3582    /// Get the push-condition context for this room.
3583    ///
3584    /// Returns `None` if some data couldn't be found. This should only happen
3585    /// in brand new rooms, while we process its state.
3586    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3587        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3588            .await
3589    }
3590
3591    /// Get the push-condition context for this room, with a choice to include
3592    /// thread subscriptions or not, based on the extra
3593    /// `with_threads_subscriptions` parameter.
3594    ///
3595    /// Returns `None` if some data couldn't be found. This should only happen
3596    /// in brand new rooms, while we process its state.
3597    pub(crate) async fn push_condition_room_ctx_internal(
3598        &self,
3599        with_threads_subscriptions: bool,
3600    ) -> Result<Option<PushConditionRoomCtx>> {
3601        let room_id = self.room_id();
3602        let user_id = self.own_user_id();
3603        let room_info = self.clone_info();
3604        let member_count = room_info.active_members_count();
3605
3606        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3607            member.name().to_owned()
3608        } else {
3609            return Ok(None);
3610        };
3611
3612        let power_levels = match self.power_levels().await {
3613            Ok(power_levels) => Some(power_levels.into()),
3614            Err(error) => {
3615                if matches!(room_info.state(), RoomState::Joined) {
3616                    // It's normal to not have the power levels in a non-joined room, so don't log
3617                    // the error if the room is not joined
3618                    error!("Could not compute power levels for push conditions: {error}");
3619                }
3620                None
3621            }
3622        };
3623
3624        let mut ctx = assign!(PushConditionRoomCtx::new(
3625            room_id.to_owned(),
3626            UInt::new(member_count).unwrap_or(UInt::MAX),
3627            user_id.to_owned(),
3628            user_display_name,
3629        ),
3630        {
3631            power_levels,
3632        });
3633
3634        if with_threads_subscriptions {
3635            let this = self.clone();
3636            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3637                let room = this.clone();
3638                Box::pin(async move {
3639                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3640                        maybe_sub.is_some()
3641                    } else {
3642                        false
3643                    }
3644                })
3645            });
3646        }
3647
3648        Ok(Some(ctx))
3649    }
3650
3651    /// Retrieves a [`PushContext`] that can be used to compute the push
3652    /// actions for events.
3653    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3654        self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3655    }
3656
3657    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3658    /// for events, with a choice to include thread subscriptions or not,
3659    /// based on the extra `with_threads_subscriptions` parameter.
3660    #[instrument(skip(self))]
3661    pub(crate) async fn push_context_internal(
3662        &self,
3663        with_threads_subscriptions: bool,
3664    ) -> Result<Option<PushContext>> {
3665        let Some(push_condition_room_ctx) =
3666            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3667        else {
3668            debug!("Could not aggregate push context");
3669            return Ok(None);
3670        };
3671        let push_rules = self.client().account().push_rules().await?;
3672        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3673    }
3674
3675    /// Get the push actions for the given event with the current room state.
3676    ///
3677    /// Note that it is possible that no push action is returned because the
3678    /// current room state does not have all the required state events.
3679    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3680        if let Some(ctx) = self.push_context().await? {
3681            Ok(Some(ctx.for_event(event).await))
3682        } else {
3683            Ok(None)
3684        }
3685    }
3686
3687    /// The membership details of the (latest) invite for the logged-in user in
3688    /// this room.
3689    pub async fn invite_details(&self) -> Result<Invite> {
3690        let state = self.state();
3691
3692        if state != RoomState::Invited {
3693            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3694        }
3695
3696        let invitee = self
3697            .get_member_no_sync(self.own_user_id())
3698            .await?
3699            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3700        let event = invitee.event();
3701
3702        let inviter_id = event.sender().to_owned();
3703        let inviter = self.get_member_no_sync(&inviter_id).await?;
3704
3705        Ok(Invite { invitee, inviter_id, inviter })
3706    }
3707
3708    /// Get the membership details for the current user.
3709    ///
3710    /// Returns:
3711    ///     - If the user was present in the room, a
3712    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3713    ///       member info of the sender of the `m.room.member` event.
3714    ///     - If the current user is not present, an error.
3715    pub async fn member_with_sender_info(
3716        &self,
3717        user_id: &UserId,
3718    ) -> Result<RoomMemberWithSenderInfo> {
3719        let Some(member) = self.get_member_no_sync(user_id).await? else {
3720            return Err(Error::InsufficientData);
3721        };
3722
3723        let sender_member =
3724            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3725                // If the sender room member info is already available, return it
3726                Some(member)
3727            } else if self.are_members_synced() {
3728                // The room members are synced and we couldn't find the sender info
3729                None
3730            } else if self.sync_members().await.is_ok() {
3731                // Try getting the sender room member info again after syncing
3732                self.get_member_no_sync(member.event().sender()).await?
3733            } else {
3734                None
3735            };
3736
3737        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3738    }
3739
3740    /// Forget this room.
3741    ///
3742    /// This communicates to the homeserver that it should forget the room.
3743    ///
3744    /// Only left or banned-from rooms can be forgotten.
3745    pub async fn forget(&self) -> Result<()> {
3746        let state = self.state();
3747        match state {
3748            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3749                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3750                    "Left / Banned",
3751                    state,
3752                ))));
3753            }
3754            RoomState::Left | RoomState::Banned => {}
3755        }
3756
3757        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3758        let _response = self.client.send(request).await?;
3759
3760        // If it was a DM, remove the room from the `m.direct` global account data.
3761        if self.inner.direct_targets_length() != 0
3762            && let Err(e) = self.set_is_direct(false).await
3763        {
3764            // It is not important whether we managed to remove the room, it will not have
3765            // any consequences, so just log the error.
3766            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3767        }
3768
3769        self.client.base_client().forget_room(self.inner.room_id()).await?;
3770
3771        Ok(())
3772    }
3773
3774    fn ensure_room_joined(&self) -> Result<()> {
3775        let state = self.state();
3776        if state == RoomState::Joined {
3777            Ok(())
3778        } else {
3779            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3780        }
3781    }
3782
3783    /// Get the notification mode.
3784    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3785        if !matches!(self.state(), RoomState::Joined) {
3786            return None;
3787        }
3788
3789        let notification_settings = self.client().notification_settings().await;
3790
3791        // Get the user-defined mode if available
3792        let notification_mode =
3793            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3794
3795        if notification_mode.is_some() {
3796            notification_mode
3797        } else if let Ok(is_encrypted) =
3798            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3799        {
3800            // Otherwise, if encrypted status is available, get the default mode for this
3801            // type of room.
3802            // From the point of view of notification settings, a `one-to-one` room is one
3803            // that involves exactly two people.
3804            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3805            let default_mode = notification_settings
3806                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3807                .await;
3808            Some(default_mode)
3809        } else {
3810            None
3811        }
3812    }
3813
3814    /// Get the user-defined notification mode.
3815    ///
3816    /// The result is cached for fast and non-async call. To read the cached
3817    /// result, use
3818    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3819    //
3820    // Note for maintainers:
3821    //
3822    // The fact the result is cached is an important property. If you change that in
3823    // the future, please review all calls to this method.
3824    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3825        if !matches!(self.state(), RoomState::Joined) {
3826            return None;
3827        }
3828
3829        let notification_settings = self.client().notification_settings().await;
3830
3831        // Get the user-defined mode if available.
3832        let mode =
3833            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3834
3835        if let Some(mode) = mode {
3836            self.update_cached_user_defined_notification_mode(mode);
3837        }
3838
3839        mode
3840    }
3841
3842    /// Report an event as inappropriate to the homeserver's administrator.
3843    ///
3844    /// # Arguments
3845    ///
3846    /// * `event_id` - The ID of the event to report.
3847    /// * `score` - The score to rate this content.
3848    /// * `reason` - The reason the content is being reported.
3849    ///
3850    /// # Errors
3851    ///
3852    /// Returns an error if the room is not joined or if an error occurs with
3853    /// the request.
3854    pub async fn report_content(
3855        &self,
3856        event_id: OwnedEventId,
3857        reason: Option<String>,
3858    ) -> Result<report_content::v3::Response> {
3859        let state = self.state();
3860        if state != RoomState::Joined {
3861            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3862        }
3863
3864        let request = assign!(
3865            report_content::v3::Request::new(
3866                self.inner.room_id().to_owned(),
3867                event_id,
3868            ), {
3869                reason: reason
3870            }
3871        );
3872        Ok(self.client.send(request).await?)
3873    }
3874
3875    /// Reports a room as inappropriate to the server.
3876    /// The caller is not required to be joined to the room to report it.
3877    ///
3878    /// # Arguments
3879    ///
3880    /// * `reason` - The reason the room is being reported.
3881    ///
3882    /// # Errors
3883    ///
3884    /// Returns an error if the room is not found or on rate limit
3885    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3886        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3887
3888        Ok(self.client.send(request).await?)
3889    }
3890
3891    /// Set a flag on the room to indicate that the user has explicitly marked
3892    /// it as (un)read.
3893    ///
3894    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3895    /// value as `unread`.
3896    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3897        if self.is_marked_unread() == unread {
3898            // The request is not necessary.
3899            return Ok(());
3900        }
3901
3902        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3903
3904        let content = MarkedUnreadEventContent::new(unread);
3905
3906        let request = set_room_account_data::v3::Request::new(
3907            user_id.to_owned(),
3908            self.inner.room_id().to_owned(),
3909            &content,
3910        )?;
3911
3912        self.client.send(request).await?;
3913        Ok(())
3914    }
3915
3916    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3917    /// global [`EventCache`] has been enabled for subscription.
3918    pub async fn event_cache(
3919        &self,
3920    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3921        self.client.event_cache().for_room(self.room_id()).await
3922    }
3923
3924    /// Get the beacon information event in the room for the `user_id`.
3925    ///
3926    /// # Errors
3927    ///
3928    /// Returns an error if the event is redacted, stripped, not found or could
3929    /// not be deserialized.
3930    pub(crate) async fn get_user_beacon_info(
3931        &self,
3932        user_id: &UserId,
3933    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3934        let raw_event = self
3935            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3936            .await?
3937            .ok_or(BeaconError::NotFound)?;
3938
3939        match raw_event.deserialize()? {
3940            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3941            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3942            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3943        }
3944    }
3945
3946    /// Start sharing live location in the room.
3947    ///
3948    /// # Arguments
3949    ///
3950    /// * `duration_millis` - The duration for which the live location is
3951    ///   shared, in milliseconds.
3952    /// * `description` - An optional description for the live location share.
3953    ///
3954    /// # Errors
3955    ///
3956    /// Returns an error if the room is not joined or if the state event could
3957    /// not be sent.
3958    pub async fn start_live_location_share(
3959        &self,
3960        duration_millis: u64,
3961        description: Option<String>,
3962    ) -> Result<send_state_event::v3::Response> {
3963        self.ensure_room_joined()?;
3964
3965        self.send_state_event_for_key(
3966            self.own_user_id(),
3967            BeaconInfoEventContent::new(
3968                description,
3969                Duration::from_millis(duration_millis),
3970                true,
3971                None,
3972            ),
3973        )
3974        .await
3975    }
3976
3977    /// Stop sharing live location in the room.
3978    ///
3979    /// # Errors
3980    ///
3981    /// Returns an error if the room is not joined, if the beacon information
3982    /// is redacted or stripped, if the state event is not found, or if the
3983    /// existing beacon is no longer live.
3984    pub async fn stop_live_location_share(
3985        &self,
3986    ) -> Result<send_state_event::v3::Response, BeaconError> {
3987        self.ensure_room_joined()?;
3988
3989        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3990
3991        if beacon_info_event.content.live {
3992            beacon_info_event.content.stop();
3993            Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3994        } else {
3995            Err(BeaconError::NotLive)
3996        }
3997    }
3998
3999    /// Send a location beacon event in the current room.
4000    ///
4001    /// # Arguments
4002    ///
4003    /// * `geo_uri` - The geo URI of the location beacon.
4004    ///
4005    /// # Errors
4006    ///
4007    /// Returns an error if the room is not joined, if the beacon information
4008    /// is redacted or stripped, if the location share is no longer live,
4009    /// or if the state event is not found.
4010    pub async fn send_location_beacon(
4011        &self,
4012        geo_uri: String,
4013    ) -> Result<send_message_event::v3::Response, BeaconError> {
4014        self.ensure_room_joined()?;
4015
4016        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4017
4018        if beacon_info_event.content.is_live() {
4019            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4020            Ok(self
4021                .send(content)
4022                .with_request_config(RequestConfig::new().retry_limit(6))
4023                .await?
4024                .response)
4025        } else {
4026            Err(BeaconError::NotLive)
4027        }
4028    }
4029
4030    /// Store the given `ComposerDraft` in the state store using the current
4031    /// room id and optional thread root id as identifier.
4032    pub async fn save_composer_draft(
4033        &self,
4034        draft: ComposerDraft,
4035        thread_root: Option<&EventId>,
4036    ) -> Result<()> {
4037        self.client
4038            .state_store()
4039            .set_kv_data(
4040                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4041                StateStoreDataValue::ComposerDraft(draft),
4042            )
4043            .await?;
4044        Ok(())
4045    }
4046
4047    /// Retrieve the `ComposerDraft` stored in the state store for this room
4048    /// and given thread, if any.
4049    pub async fn load_composer_draft(
4050        &self,
4051        thread_root: Option<&EventId>,
4052    ) -> Result<Option<ComposerDraft>> {
4053        let data = self
4054            .client
4055            .state_store()
4056            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4057            .await?;
4058        Ok(data.and_then(|d| d.into_composer_draft()))
4059    }
4060
4061    /// Remove the `ComposerDraft` stored in the state store for this room
4062    /// and given thread, if any.
4063    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4064        self.client
4065            .state_store()
4066            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4067            .await?;
4068        Ok(())
4069    }
4070
4071    /// Load pinned state events for a room from the `/state` endpoint in the
4072    /// home server.
4073    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4074        let response = self
4075            .client
4076            .send(get_state_event_for_key::v3::Request::new(
4077                self.room_id().to_owned(),
4078                StateEventType::RoomPinnedEvents,
4079                "".to_owned(),
4080            ))
4081            .await;
4082
4083        match response {
4084            Ok(response) => Ok(Some(
4085                response
4086                    .into_content()
4087                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4088                    .pinned,
4089            )),
4090            Err(http_error) => match http_error.as_client_api_error() {
4091                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4092                _ => Err(http_error.into()),
4093            },
4094        }
4095    }
4096
4097    /// Subscribe to knock requests in this `Room`.
4098    ///
4099    /// The current requests to join the room will be emitted immediately
4100    /// when subscribing.
4101    ///
4102    /// A new set of knock requests will be emitted whenever:
4103    /// - A new member event is received.
4104    /// - A knock request is marked as seen.
4105    /// - A sync is gappy (limited), so room membership information may be
4106    ///   outdated.
4107    ///
4108    /// Returns both a stream of knock requests and a handle for a task that
4109    /// will clean up the seen knock request ids when possible.
4110    pub async fn subscribe_to_knock_requests(
4111        &self,
4112    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4113        let this = Arc::new(self.clone());
4114
4115        let room_member_events_observer =
4116            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4117
4118        let current_seen_ids = self.get_seen_knock_request_ids().await?;
4119        let mut seen_request_ids_stream = self
4120            .seen_knock_request_ids_map
4121            .subscribe()
4122            .await
4123            .map(|values| values.unwrap_or_default());
4124
4125        let mut room_info_stream = self.subscribe_info();
4126
4127        // Spawn a task that will clean up the seen knock request ids when updated room
4128        // members are received
4129        let clear_seen_ids_handle = spawn({
4130            let this = self.clone();
4131            async move {
4132                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4133                while member_updates_stream.recv().await.is_ok() {
4134                    // If room members were updated, try to remove outdated seen knock request ids
4135                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4136                        warn!("Failed to remove seen knock requests: {err}")
4137                    }
4138                }
4139            }
4140        });
4141
4142        let combined_stream = stream! {
4143            // Emit current requests to join
4144            match this.get_current_join_requests(&current_seen_ids).await {
4145                Ok(initial_requests) => yield initial_requests,
4146                Err(err) => warn!("Failed to get initial requests to join: {err}")
4147            }
4148
4149            let mut requests_stream = room_member_events_observer.subscribe();
4150            let mut seen_ids = current_seen_ids.clone();
4151
4152            loop {
4153                // This is equivalent to a combine stream operation, triggering a new emission
4154                // when any of the branches changes
4155                tokio::select! {
4156                    Some((event, _)) = requests_stream.next() => {
4157                        if let Some(event) = event.as_original() {
4158                            // If we can calculate the membership change, try to emit only when needed
4159                            let emit = if event.prev_content().is_some() {
4160                                matches!(event.membership_change(),
4161                                    MembershipChange::Banned |
4162                                    MembershipChange::Knocked |
4163                                    MembershipChange::KnockAccepted |
4164                                    MembershipChange::KnockDenied |
4165                                    MembershipChange::KnockRetracted
4166                                )
4167                            } else {
4168                                // If we can't calculate the membership change, assume we need to
4169                                // emit updated values
4170                                true
4171                            };
4172
4173                            if emit {
4174                                match this.get_current_join_requests(&seen_ids).await {
4175                                    Ok(requests) => yield requests,
4176                                    Err(err) => {
4177                                        warn!("Failed to get updated knock requests on new member event: {err}")
4178                                    }
4179                                }
4180                            }
4181                        }
4182                    }
4183
4184                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4185                        // Update the current seen ids
4186                        seen_ids = new_seen_ids;
4187
4188                        // If seen requests have changed we need to recalculate
4189                        // all the knock requests
4190                        match this.get_current_join_requests(&seen_ids).await {
4191                            Ok(requests) => yield requests,
4192                            Err(err) => {
4193                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4194                            }
4195                        }
4196                    }
4197
4198                    Some(room_info) = room_info_stream.next() => {
4199                        // We need to emit new items when we may have missing room members:
4200                        // this usually happens after a gappy (limited) sync
4201                        if !room_info.are_members_synced() {
4202                            match this.get_current_join_requests(&seen_ids).await {
4203                                Ok(requests) => yield requests,
4204                                Err(err) => {
4205                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4206                                }
4207                            }
4208                        }
4209                    }
4210                    // If the streams in all branches are closed, stop the loop
4211                    else => break,
4212                }
4213            }
4214        };
4215
4216        Ok((combined_stream, clear_seen_ids_handle))
4217    }
4218
4219    async fn get_current_join_requests(
4220        &self,
4221        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4222    ) -> Result<Vec<KnockRequest>> {
4223        Ok(self
4224            .members(RoomMemberships::KNOCK)
4225            .await?
4226            .into_iter()
4227            .filter_map(|member| {
4228                let event_id = member.event().event_id()?;
4229                Some(KnockRequest::new(
4230                    self,
4231                    event_id,
4232                    member.event().timestamp(),
4233                    KnockRequestMemberInfo::from_member(&member),
4234                    seen_request_ids.contains_key(event_id),
4235                ))
4236            })
4237            .collect())
4238    }
4239
4240    /// Access the room settings related to privacy and visibility.
4241    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4242        RoomPrivacySettings::new(&self.inner, &self.client)
4243    }
4244
4245    /// Retrieve a list of all the threads for the current room.
4246    ///
4247    /// Since this client-server API is paginated, the return type may include a
4248    /// token used to resuming back-pagination into the list of results, in
4249    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4250    /// [`ListThreadsOptions::from`] to continue the pagination
4251    /// from the previous position.
4252    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4253        let request = opts.into_request(self.room_id());
4254
4255        let response = self.client.send(request).await?;
4256
4257        let push_ctx = self.push_context().await?;
4258        let chunk = join_all(
4259            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4260        )
4261        .await;
4262
4263        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4264    }
4265
4266    /// Retrieve a list of relations for the given event, according to the given
4267    /// options, using the network.
4268    ///
4269    /// Since this client-server API is paginated, the return type may include a
4270    /// token used to resuming back-pagination into the list of results, in
4271    /// [`Relations::prev_batch_token`]. This token can be fed back into
4272    /// [`RelationsOptions::from`] to continue the pagination from the previous
4273    /// position.
4274    ///
4275    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4276    /// then it must be used with the same
4277    /// [`RelationsOptions::include_relations`] value as the request that
4278    /// returns the `from` token, otherwise the server behavior is undefined.
4279    pub async fn relations(
4280        &self,
4281        event_id: OwnedEventId,
4282        opts: RelationsOptions,
4283    ) -> Result<Relations> {
4284        let relations = opts.send(self, event_id).await;
4285
4286        // Save any new related events to the cache.
4287        if let Ok(Relations { chunk, .. }) = &relations
4288            && let Ok((cache, _handles)) = self.event_cache().await
4289        {
4290            cache.save_events(chunk.clone()).await;
4291        }
4292
4293        relations
4294    }
4295
4296    /// Subscribe to a given thread in this room.
4297    ///
4298    /// This will subscribe the user to the thread, so that they will receive
4299    /// notifications for that thread specifically.
4300    ///
4301    /// # Arguments
4302    ///
4303    /// - `thread_root`: The ID of the thread root event to subscribe to.
4304    /// - `automatic`: Whether the subscription was made automatically by a
4305    ///   client, not by manual user choice. If set, must include the latest
4306    ///   event ID that's known in the thread and that is causing the automatic
4307    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4308    ///   was a previous automatic subscription, the subscription will be
4309    ///   overridden to a manual one instead.
4310    ///
4311    /// # Returns
4312    ///
4313    /// - A 404 error if the event isn't known, or isn't a thread root.
4314    /// - An `Ok` result if the subscription was successful, or if the server
4315    ///   skipped an automatic subscription (as the user unsubscribed from the
4316    ///   thread after the event causing the automatic subscription).
4317    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4318    pub async fn subscribe_thread(
4319        &self,
4320        thread_root: OwnedEventId,
4321        automatic: Option<OwnedEventId>,
4322    ) -> Result<()> {
4323        let is_automatic = automatic.is_some();
4324
4325        match self
4326            .client
4327            .send(subscribe_thread::unstable::Request::new(
4328                self.room_id().to_owned(),
4329                thread_root.clone(),
4330                automatic,
4331            ))
4332            .await
4333        {
4334            Ok(_response) => {
4335                trace!("Server acknowledged the thread subscription; saving in db");
4336
4337                // Immediately save the result into the database.
4338                self.client
4339                    .state_store()
4340                    .upsert_thread_subscriptions(vec![(
4341                        self.room_id(),
4342                        &thread_root,
4343                        StoredThreadSubscription {
4344                            status: ThreadSubscriptionStatus::Subscribed {
4345                                automatic: is_automatic,
4346                            },
4347                            bump_stamp: None,
4348                        },
4349                    )])
4350                    .await?;
4351
4352                Ok(())
4353            }
4354
4355            Err(err) => {
4356                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4357                    // In this case: the server indicates that the user unsubscribed *after* the
4358                    // event ID we've used in an automatic subscription; don't
4359                    // save the subscription state in the database, as the
4360                    // previous one should be more correct.
4361                    trace!("Thread subscription skipped: {err}");
4362                    Ok(())
4363                } else {
4364                    // Forward the error to the caller.
4365                    Err(err.into())
4366                }
4367            }
4368        }
4369    }
4370
4371    /// Subscribe to a thread if needed, based on a current subscription to it.
4372    ///
4373    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4374    /// has already subscribed to a thread, so as to minimize sending
4375    /// unnecessary subscriptions which would be ignored by the server.
4376    pub async fn subscribe_thread_if_needed(
4377        &self,
4378        thread_root: &EventId,
4379        automatic: Option<OwnedEventId>,
4380    ) -> Result<()> {
4381        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4382            // If we have a previous subscription, we should only send the new one if it's
4383            // manual and the previous one was automatic.
4384            if !prev_sub.automatic || automatic.is_some() {
4385                // Either we had already a manual subscription, or we had an automatic one and
4386                // the new one is automatic too: nothing to do!
4387                return Ok(());
4388            }
4389        }
4390        self.subscribe_thread(thread_root.to_owned(), automatic).await
4391    }
4392
4393    /// Unsubscribe from a given thread in this room.
4394    ///
4395    /// # Arguments
4396    ///
4397    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4398    ///
4399    /// # Returns
4400    ///
4401    /// - An `Ok` result if the unsubscription was successful, or the thread was
4402    ///   already unsubscribed.
4403    /// - A 404 error if the event isn't known, or isn't a thread root.
4404    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4405    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4406        self.client
4407            .send(unsubscribe_thread::unstable::Request::new(
4408                self.room_id().to_owned(),
4409                thread_root.clone(),
4410            ))
4411            .await?;
4412
4413        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4414
4415        // Immediately save the result into the database.
4416        self.client
4417            .state_store()
4418            .upsert_thread_subscriptions(vec![(
4419                self.room_id(),
4420                &thread_root,
4421                StoredThreadSubscription {
4422                    status: ThreadSubscriptionStatus::Unsubscribed,
4423                    bump_stamp: None,
4424                },
4425            )])
4426            .await?;
4427
4428        Ok(())
4429    }
4430
4431    /// Return the current thread subscription for the given thread root in this
4432    /// room.
4433    ///
4434    /// # Arguments
4435    ///
4436    /// - `thread_root`: The ID of the thread root event to get the subscription
4437    ///   for.
4438    ///
4439    /// # Returns
4440    ///
4441    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4442    ///   subscription information.
4443    /// - An `Ok` result with `None` if the subscription does not exist, or the
4444    ///   event couldn't be found, or the event isn't a thread.
4445    /// - An error if the request fails for any other reason, such as a network
4446    ///   error.
4447    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4448    pub async fn fetch_thread_subscription(
4449        &self,
4450        thread_root: OwnedEventId,
4451    ) -> Result<Option<ThreadSubscription>> {
4452        let result = self
4453            .client
4454            .send(get_thread_subscription::unstable::Request::new(
4455                self.room_id().to_owned(),
4456                thread_root.clone(),
4457            ))
4458            .await;
4459
4460        let subscription = match result {
4461            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4462            Err(http_error) => match http_error.as_client_api_error() {
4463                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4464                _ => return Err(http_error.into()),
4465            },
4466        };
4467
4468        // Keep the database in sync.
4469        if let Some(sub) = &subscription {
4470            self.client
4471                .state_store()
4472                .upsert_thread_subscriptions(vec![(
4473                    self.room_id(),
4474                    &thread_root,
4475                    StoredThreadSubscription {
4476                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4477                        bump_stamp: None,
4478                    },
4479                )])
4480                .await?;
4481        } else {
4482            // If the subscription was not found, remove it from the database.
4483            self.client
4484                .state_store()
4485                .remove_thread_subscription(self.room_id(), &thread_root)
4486                .await?;
4487        }
4488
4489        Ok(subscription)
4490    }
4491
4492    /// Return the current thread subscription for the given thread root in this
4493    /// room, by getting it from storage if possible, or fetching it from
4494    /// network otherwise.
4495    ///
4496    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4497    /// this method.
4498    pub async fn load_or_fetch_thread_subscription(
4499        &self,
4500        thread_root: &EventId,
4501    ) -> Result<Option<ThreadSubscription>> {
4502        // If the thread subscriptions list is outdated, fetch from the server.
4503        if self.client.thread_subscription_catchup().is_outdated() {
4504            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4505        }
4506
4507        // Otherwise, we can rely on the store information.
4508        Ok(self
4509            .client
4510            .state_store()
4511            .load_thread_subscription(self.room_id(), thread_root)
4512            .await
4513            .map(|maybe_sub| {
4514                maybe_sub.and_then(|stored| match stored.status {
4515                    ThreadSubscriptionStatus::Unsubscribed => None,
4516                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4517                        Some(ThreadSubscription { automatic })
4518                    }
4519                })
4520            })?)
4521    }
4522
4523    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4524    /// event containing the new event id.
4525    ///
4526    /// This method will first try to get the pinned events from the current
4527    /// room's state and if it fails to do so it'll try to load them from the
4528    /// homeserver.
4529    ///
4530    /// Returns `true` if we pinned the event, `false` if the event was already
4531    /// pinned.
4532    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4533        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4534            event_ids
4535        } else {
4536            self.load_pinned_events().await?.unwrap_or_default()
4537        };
4538        let event_id = event_id.to_owned();
4539        if pinned_event_ids.contains(&event_id) {
4540            Ok(false)
4541        } else {
4542            pinned_event_ids.push(event_id);
4543            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4544            self.send_state_event(content).await?;
4545            Ok(true)
4546        }
4547    }
4548
4549    /// Removes a pinned event by sending an updated `m.room.pinned_events`
4550    /// event without the event id we want to remove.
4551    ///
4552    /// This method will first try to get the pinned events from the current
4553    /// room's state and if it fails to do so it'll try to load them from the
4554    /// homeserver.
4555    ///
4556    /// Returns `true` if we unpinned the event, `false` if the event wasn't
4557    /// pinned before.
4558    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4559        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4560            event_ids
4561        } else {
4562            self.load_pinned_events().await?.unwrap_or_default()
4563        };
4564        let event_id = event_id.to_owned();
4565        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4566            pinned_event_ids.remove(idx);
4567            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4568            self.send_state_event(content).await?;
4569            Ok(true)
4570        } else {
4571            Ok(false)
4572        }
4573    }
4574
4575    /// Computes if the current room is a DM, stores the loaded values, and then
4576    /// returns the result.
4577    pub async fn compute_is_dm(&self) -> Result<bool> {
4578        Ok(self.inner.compute_is_dm(self.client.dm_room_definition()).await?)
4579    }
4580
4581    /// Checks if the current room is a DM in a synchronous way, without
4582    /// actually checking any local stores. Note this can be either a cached or
4583    /// an approximate value, since some important data may be unavailable
4584    /// and we may need to make some assumptions.
4585    pub fn is_dm(&self) -> bool {
4586        // Note: this value may be wrong for invited rooms.
4587        let is_direct = self.direct_targets_length() == 1;
4588        match self.client.dm_room_definition() {
4589            DmRoomDefinition::MatrixSpec => {
4590                // If there is a single target, it's a DM.
4591                is_direct
4592            }
4593            DmRoomDefinition::TwoMembers => {
4594                // If there is a single target and at most 2 active members, it's a DM.
4595                // Try getting the calculated active service members count from the room info.
4596                let active_service_member_count =
4597                    self.active_service_members_count().unwrap_or_else(|| {
4598                        // Otherwise just use an approximated value based on the service members
4599                        // count.
4600                        self.service_members().map(|members| members.len()).unwrap_or_default()
4601                            as u64
4602                    });
4603                let has_at_most_two_active_members =
4604                    self.active_members_count().saturating_sub(active_service_member_count) <= 2;
4605                is_direct && has_at_most_two_active_members
4606            }
4607        }
4608    }
4609}
4610
4611#[cfg(feature = "e2e-encryption")]
4612impl RoomIdentityProvider for Room {
4613    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4614        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4615    }
4616
4617    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4618        Box::pin(async {
4619            let members = self
4620                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4621                .await
4622                .unwrap_or_else(|_| Default::default());
4623
4624            let mut ret: Vec<UserIdentity> = Vec::new();
4625            for member in members {
4626                if let Some(i) = self.user_identity(member.user_id()).await {
4627                    ret.push(i);
4628                }
4629            }
4630            ret
4631        })
4632    }
4633
4634    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4635        Box::pin(async {
4636            self.client
4637                .encryption()
4638                .get_user_identity(user_id)
4639                .await
4640                .unwrap_or(None)
4641                .map(|u| u.underlying_identity())
4642        })
4643    }
4644}
4645
4646/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4647/// room, only when needed.
4648#[derive(Clone, Debug)]
4649pub(crate) struct WeakRoom {
4650    client: WeakClient,
4651    room_id: OwnedRoomId,
4652}
4653
4654impl WeakRoom {
4655    /// Create a new `WeakRoom` given its weak components.
4656    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4657        Self { client, room_id }
4658    }
4659
4660    /// Attempts to reconstruct the room.
4661    pub fn get(&self) -> Option<Room> {
4662        self.client.get().and_then(|client| client.get_room(&self.room_id))
4663    }
4664
4665    /// The room id for that room.
4666    pub fn room_id(&self) -> &RoomId {
4667        &self.room_id
4668    }
4669}
4670
4671/// Details of the (latest) invite.
4672#[derive(Debug, Clone)]
4673pub struct Invite {
4674    /// Who has been invited.
4675    pub invitee: RoomMember,
4676
4677    /// The user ID of who sent the invite.
4678    ///
4679    /// This is useful if `Self::inviter` is `None`.
4680    pub inviter_id: OwnedUserId,
4681
4682    /// Who sent the invite.
4683    ///
4684    /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4685    pub inviter: Option<RoomMember>,
4686}
4687
4688#[derive(Error, Debug)]
4689enum InvitationError {
4690    #[error("No membership event found")]
4691    EventMissing,
4692}
4693
4694/// Receipts to send all at once.
4695#[derive(Debug, Clone, Default)]
4696#[non_exhaustive]
4697pub struct Receipts {
4698    /// Fully-read marker (room account data).
4699    pub fully_read: Option<OwnedEventId>,
4700    /// Read receipt (public ephemeral room event).
4701    pub public_read_receipt: Option<OwnedEventId>,
4702    /// Read receipt (private ephemeral room event).
4703    pub private_read_receipt: Option<OwnedEventId>,
4704}
4705
4706impl Receipts {
4707    /// Create an empty `Receipts`.
4708    pub fn new() -> Self {
4709        Self::default()
4710    }
4711
4712    /// Set the last event the user has read.
4713    ///
4714    /// It means that the user has read all the events before this event.
4715    ///
4716    /// This is a private marker only visible by the user.
4717    ///
4718    /// Note that this is technically not a receipt as it is persisted in the
4719    /// room account data.
4720    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4721        self.fully_read = event_id.into();
4722        self
4723    }
4724
4725    /// Set the last event presented to the user and forward it to the other
4726    /// users in the room.
4727    ///
4728    /// This is used to reset the unread messages/notification count and
4729    /// advertise to other users the last event that the user has likely seen.
4730    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4731        self.public_read_receipt = event_id.into();
4732        self
4733    }
4734
4735    /// Set the last event presented to the user and don't forward it.
4736    ///
4737    /// This is used to reset the unread messages/notification count.
4738    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4739        self.private_read_receipt = event_id.into();
4740        self
4741    }
4742
4743    /// Whether this `Receipts` is empty.
4744    pub fn is_empty(&self) -> bool {
4745        self.fully_read.is_none()
4746            && self.public_read_receipt.is_none()
4747            && self.private_read_receipt.is_none()
4748    }
4749}
4750
4751/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4752/// listed by a room, possibly validated by checking the space's state.
4753#[derive(Debug)]
4754pub enum ParentSpace {
4755    /// The room recognizes the given room as its parent, and the parent
4756    /// recognizes it as its child.
4757    Reciprocal(Room),
4758    /// The room recognizes the given room as its parent, but the parent does
4759    /// not recognizes it as its child. However, the author of the
4760    /// `m.space.parent` event in the room has a sufficient power level in the
4761    /// parent to create the child event.
4762    WithPowerlevel(Room),
4763    /// The room recognizes the given room as its parent, but the parent does
4764    /// not recognizes it as its child.
4765    Illegitimate(Room),
4766    /// The room recognizes the given id as its parent room, but we cannot check
4767    /// whether the parent recognizes it as its child.
4768    Unverifiable(OwnedRoomId),
4769}
4770
4771trait EventSource {
4772    fn get_event(
4773        &self,
4774        event_id: &EventId,
4775    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4776}
4777
4778impl EventSource for &Room {
4779    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4780        self.load_or_fetch_event(event_id, None).await
4781    }
4782}
4783
4784/// Contains the current user's room member info and the optional room member
4785/// info of the sender of the `m.room.member` event that this info represents.
4786#[derive(Debug)]
4787pub struct RoomMemberWithSenderInfo {
4788    /// The actual room member.
4789    pub room_member: RoomMember,
4790    /// The info of the sender of the event `room_member` is based on, if
4791    /// available.
4792    pub sender_info: Option<RoomMember>,
4793}
4794
4795#[cfg(all(test, not(target_family = "wasm")))]
4796mod tests {
4797    use std::collections::BTreeMap;
4798
4799    use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4800    use matrix_sdk_test::{
4801        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4802    };
4803    use ruma::{
4804        RoomVersionId, event_id,
4805        events::{relation::RelationType, room::member::MembershipState},
4806        owned_event_id, room_id, user_id,
4807    };
4808    use wiremock::{
4809        Mock, MockServer, ResponseTemplate,
4810        matchers::{header, method, path_regex},
4811    };
4812
4813    use crate::{
4814        Client,
4815        config::RequestConfig,
4816        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4817        test_utils::{
4818            client::mock_matrix_session,
4819            logged_in_client,
4820            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4821        },
4822    };
4823
4824    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4825    #[async_test]
4826    async fn test_cache_invalidation_while_encrypt() {
4827        use matrix_sdk_base::store::RoomLoadSettings;
4828        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4829
4830        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4831        let session = mock_matrix_session();
4832
4833        let client = Client::builder()
4834            .homeserver_url("http://localhost:1234")
4835            .request_config(RequestConfig::new().disable_retry())
4836            .sqlite_store(&sqlite_path, None)
4837            .build()
4838            .await
4839            .unwrap();
4840        client
4841            .matrix_auth()
4842            .restore_session(session.clone(), RoomLoadSettings::default())
4843            .await
4844            .unwrap();
4845
4846        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4847
4848        // Mock receiving an event to create an internal room.
4849        let server = MockServer::start().await;
4850        {
4851            Mock::given(method("GET"))
4852                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4853                .and(header("authorization", "Bearer 1234"))
4854                .respond_with(
4855                    ResponseTemplate::new(200)
4856                        .set_body_json(EventFactory::new().room_encryption().into_content()),
4857                )
4858                .mount(&server)
4859                .await;
4860            let f = EventFactory::new().sender(user_id!("@example:localhost"));
4861            let response = SyncResponseBuilder::default()
4862                .add_joined_room(
4863                    JoinedRoomBuilder::default()
4864                        .add_state_event(
4865                            f.member(user_id!("@example:localhost")).display_name("example"),
4866                        )
4867                        .add_state_event(f.default_power_levels())
4868                        .add_state_event(f.room_encryption()),
4869                )
4870                .build_sync_response();
4871            client.base_client().receive_sync_response(response).await.unwrap();
4872        }
4873
4874        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4875
4876        // Step 1, preshare the room keys.
4877        room.preshare_room_key().await.unwrap();
4878
4879        // Step 2, force lock invalidation by pretending another client obtained the
4880        // lock.
4881        {
4882            let client = Client::builder()
4883                .homeserver_url("http://localhost:1234")
4884                .request_config(RequestConfig::new().disable_retry())
4885                .sqlite_store(&sqlite_path, None)
4886                .build()
4887                .await
4888                .unwrap();
4889            client
4890                .matrix_auth()
4891                .restore_session(session.clone(), RoomLoadSettings::default())
4892                .await
4893                .unwrap();
4894            client
4895                .encryption()
4896                .enable_cross_process_store_lock("client2".to_owned())
4897                .await
4898                .unwrap();
4899
4900            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4901            assert!(guard.is_some());
4902        }
4903
4904        // Step 3, take the crypto-store lock.
4905        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4906        assert!(guard.is_some());
4907
4908        // Step 4, try to encrypt a message.
4909        let olm = client.olm_machine().await;
4910        let olm = olm.as_ref().expect("Olm machine wasn't started");
4911
4912        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4913        // caching the outgoing session before.
4914        let _encrypted_content = olm
4915            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4916            .await
4917            .unwrap();
4918    }
4919
4920    #[async_test]
4921    async fn test_composer_draft() {
4922        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4923
4924        let client = logged_in_client(None).await;
4925
4926        let response = SyncResponseBuilder::default()
4927            .add_joined_room(JoinedRoomBuilder::default())
4928            .build_sync_response();
4929        client.base_client().receive_sync_response(response).await.unwrap();
4930        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4931
4932        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4933
4934        // Save 2 drafts, one for the room and one for a thread.
4935
4936        let draft = ComposerDraft {
4937            plain_text: "Hello, world!".to_owned(),
4938            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4939            draft_type: ComposerDraftType::NewMessage,
4940            attachments: vec![DraftAttachment {
4941                filename: "cat.txt".to_owned(),
4942                content: matrix_sdk_base::DraftAttachmentContent::File {
4943                    data: b"meow".to_vec(),
4944                    mimetype: Some("text/plain".to_owned()),
4945                    size: Some(5),
4946                },
4947            }],
4948        };
4949
4950        room.save_composer_draft(draft.clone(), None).await.unwrap();
4951
4952        let thread_root = owned_event_id!("$thread_root:b.c");
4953        let thread_draft = ComposerDraft {
4954            plain_text: "Hello, thread!".to_owned(),
4955            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4956            draft_type: ComposerDraftType::NewMessage,
4957            attachments: vec![DraftAttachment {
4958                filename: "dog.txt".to_owned(),
4959                content: matrix_sdk_base::DraftAttachmentContent::File {
4960                    data: b"wuv".to_vec(),
4961                    mimetype: Some("text/plain".to_owned()),
4962                    size: Some(4),
4963                },
4964            }],
4965        };
4966
4967        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4968
4969        // Check that the room draft was saved correctly
4970        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4971
4972        // Check that the thread draft was saved correctly
4973        assert_eq!(
4974            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4975            Some(thread_draft.clone())
4976        );
4977
4978        // Clear the room draft
4979        room.clear_composer_draft(None).await.unwrap();
4980        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4981
4982        // Check that the thread one is still there
4983        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4984
4985        // Clear the thread draft as well
4986        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4987        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4988    }
4989
4990    #[async_test]
4991    async fn test_mark_join_requests_as_seen() {
4992        let server = MatrixMockServer::new().await;
4993        let client = server.client_builder().build().await;
4994        let event_id = event_id!("$a:b.c");
4995        let room_id = room_id!("!a:b.c");
4996        let user_id = user_id!("@alice:b.c");
4997
4998        let f = EventFactory::new().room(room_id);
4999        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5000            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
5001        ]);
5002        let room = server.sync_room(&client, joined_room_builder).await;
5003
5004        // When loading the initial seen ids, there are none
5005        let seen_ids =
5006            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5007        assert!(seen_ids.is_empty());
5008
5009        // We mark a random event id as seen
5010        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
5011            .await
5012            .expect("Couldn't mark join request as seen");
5013
5014        // Then we can check it was successfully marked as seen
5015        let seen_ids =
5016            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5017        assert_eq!(seen_ids.len(), 1);
5018        assert_eq!(
5019            seen_ids.into_iter().next().expect("No next value"),
5020            (event_id.to_owned(), user_id.to_owned())
5021        )
5022    }
5023
5024    #[async_test]
5025    async fn test_own_room_membership_with_no_own_member_event() {
5026        let server = MatrixMockServer::new().await;
5027        let client = server.client_builder().build().await;
5028        let room_id = room_id!("!a:b.c");
5029
5030        let room = server.sync_joined_room(&client, room_id).await;
5031
5032        // Since there is no member event for the own user, the method fails.
5033        // This should never happen in an actual room.
5034        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5035        assert!(error.is_some());
5036    }
5037
5038    #[async_test]
5039    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5040        let server = MatrixMockServer::new().await;
5041        let client = server.client_builder().build().await;
5042        let room_id = room_id!("!a:b.c");
5043        let user_id = user_id!("@example:localhost");
5044
5045        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5046        let joined_room_builder =
5047            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5048        let room = server.sync_room(&client, joined_room_builder).await;
5049
5050        // When we load the membership details
5051        let ret = room
5052            .member_with_sender_info(client.user_id().unwrap())
5053            .await
5054            .expect("Room member info should be available");
5055
5056        // We get the member info for the current user
5057        assert_eq!(ret.room_member.event().user_id(), user_id);
5058
5059        // But there is no info for the sender
5060        assert!(ret.sender_info.is_none());
5061    }
5062
5063    #[async_test]
5064    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5065        let server = MatrixMockServer::new().await;
5066        let client = server.client_builder().build().await;
5067        let room_id = room_id!("!a:b.c");
5068        let user_id = user_id!("@example:localhost");
5069
5070        let f = EventFactory::new().room(room_id).sender(user_id);
5071        let joined_room_builder =
5072            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5073        let room = server.sync_room(&client, joined_room_builder).await;
5074
5075        // When we load the membership details
5076        let ret = room
5077            .member_with_sender_info(client.user_id().unwrap())
5078            .await
5079            .expect("Room member info should be available");
5080
5081        // We get the current user's member info
5082        assert_eq!(ret.room_member.event().user_id(), user_id);
5083
5084        // And the sender has the same info, since it's also the current user
5085        assert!(ret.sender_info.is_some());
5086        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5087    }
5088
5089    #[async_test]
5090    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5091        let server = MatrixMockServer::new().await;
5092        let client = server.client_builder().build().await;
5093        let room_id = room_id!("!a:b.c");
5094        let user_id = user_id!("@example:localhost");
5095        let sender_id = user_id!("@alice:b.c");
5096
5097        let f = EventFactory::new().room(room_id).sender(sender_id);
5098        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5099            f.member(user_id).into(),
5100            // The sender info comes from the sync
5101            f.member(sender_id).into(),
5102        ]);
5103        let room = server.sync_room(&client, joined_room_builder).await;
5104
5105        // When we load the membership details
5106        let ret = room
5107            .member_with_sender_info(client.user_id().unwrap())
5108            .await
5109            .expect("Room member info should be available");
5110
5111        // We get the current user's member info
5112        assert_eq!(ret.room_member.event().user_id(), user_id);
5113
5114        // And also the sender info from the events received in the sync
5115        assert!(ret.sender_info.is_some());
5116        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5117    }
5118
5119    #[async_test]
5120    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5121        let server = MatrixMockServer::new().await;
5122        let client = server.client_builder().build().await;
5123        let room_id = room_id!("!a:b.c");
5124        let user_id = user_id!("@example:localhost");
5125        let sender_id = user_id!("@alice:b.c");
5126
5127        let f = EventFactory::new().room(room_id).sender(sender_id);
5128        let joined_room_builder =
5129            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5130        let room = server.sync_room(&client, joined_room_builder).await;
5131
5132        // We'll receive the member info through the /members endpoint
5133        server
5134            .mock_get_members()
5135            .ok(vec![f.member(sender_id).into_raw()])
5136            .mock_once()
5137            .mount()
5138            .await;
5139
5140        // We get the current user's member info
5141        let ret = room
5142            .member_with_sender_info(client.user_id().unwrap())
5143            .await
5144            .expect("Room member info should be available");
5145
5146        // We get the current user's member info
5147        assert_eq!(ret.room_member.event().user_id(), user_id);
5148
5149        // And also the sender info from the /members endpoint
5150        assert!(ret.sender_info.is_some());
5151        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5152    }
5153
5154    #[async_test]
5155    async fn test_list_threads() {
5156        let server = MatrixMockServer::new().await;
5157        let client = server.client_builder().build().await;
5158
5159        let room_id = room_id!("!a:b.c");
5160        let sender_id = user_id!("@alice:b.c");
5161        let f = EventFactory::new().room(room_id).sender(sender_id);
5162
5163        let eid1 = event_id!("$1");
5164        let eid2 = event_id!("$2");
5165        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5166        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5167
5168        server
5169            .mock_room_threads()
5170            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5171            .mock_once()
5172            .mount()
5173            .await;
5174        server
5175            .mock_room_threads()
5176            .match_from("prev_batch")
5177            .ok(batch2, None)
5178            .mock_once()
5179            .mount()
5180            .await;
5181
5182        let room = server.sync_joined_room(&client, room_id).await;
5183        let result =
5184            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5185        assert_eq!(result.chunk.len(), 1);
5186        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5187        assert!(result.prev_batch_token.is_some());
5188
5189        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5190        let result = room.list_threads(opts).await.expect("Failed to list threads");
5191        assert_eq!(result.chunk.len(), 1);
5192        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5193        assert!(result.prev_batch_token.is_none());
5194    }
5195
5196    #[async_test]
5197    async fn test_relations() {
5198        let server = MatrixMockServer::new().await;
5199        let client = server.client_builder().build().await;
5200
5201        let room_id = room_id!("!a:b.c");
5202        let sender_id = user_id!("@alice:b.c");
5203        let f = EventFactory::new().room(room_id).sender(sender_id);
5204
5205        let target_event_id = owned_event_id!("$target");
5206        let eid1 = event_id!("$1");
5207        let eid2 = event_id!("$2");
5208        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5209        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5210
5211        server
5212            .mock_room_relations()
5213            .match_target_event(target_event_id.clone())
5214            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5215            .mock_once()
5216            .mount()
5217            .await;
5218
5219        server
5220            .mock_room_relations()
5221            .match_target_event(target_event_id.clone())
5222            .match_from("next_batch")
5223            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5224            .mock_once()
5225            .mount()
5226            .await;
5227
5228        let room = server.sync_joined_room(&client, room_id).await;
5229
5230        // Main endpoint: no relation type filtered out.
5231        let mut opts = RelationsOptions {
5232            include_relations: IncludeRelations::AllRelations,
5233            ..Default::default()
5234        };
5235        let result = room
5236            .relations(target_event_id.clone(), opts.clone())
5237            .await
5238            .expect("Failed to list relations the first time");
5239        assert_eq!(result.chunk.len(), 1);
5240        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5241        assert!(result.prev_batch_token.is_none());
5242        assert!(result.next_batch_token.is_some());
5243        assert!(result.recursion_depth.is_none());
5244
5245        opts.from = result.next_batch_token;
5246        let result = room
5247            .relations(target_event_id, opts)
5248            .await
5249            .expect("Failed to list relations the second time");
5250        assert_eq!(result.chunk.len(), 1);
5251        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5252        assert!(result.prev_batch_token.is_none());
5253        assert!(result.next_batch_token.is_none());
5254        assert!(result.recursion_depth.is_none());
5255    }
5256
5257    #[async_test]
5258    async fn test_relations_with_reltype() {
5259        let server = MatrixMockServer::new().await;
5260        let client = server.client_builder().build().await;
5261
5262        let room_id = room_id!("!a:b.c");
5263        let sender_id = user_id!("@alice:b.c");
5264        let f = EventFactory::new().room(room_id).sender(sender_id);
5265
5266        let target_event_id = owned_event_id!("$target");
5267        let eid1 = event_id!("$1");
5268        let eid2 = event_id!("$2");
5269        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5270        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5271
5272        server
5273            .mock_room_relations()
5274            .match_target_event(target_event_id.clone())
5275            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5276            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5277            .mock_once()
5278            .mount()
5279            .await;
5280
5281        server
5282            .mock_room_relations()
5283            .match_target_event(target_event_id.clone())
5284            .match_from("next_batch")
5285            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5286            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5287            .mock_once()
5288            .mount()
5289            .await;
5290
5291        let room = server.sync_joined_room(&client, room_id).await;
5292
5293        // Reltype-filtered endpoint, for threads \o/
5294        let mut opts = RelationsOptions {
5295            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5296            ..Default::default()
5297        };
5298        let result = room
5299            .relations(target_event_id.clone(), opts.clone())
5300            .await
5301            .expect("Failed to list relations the first time");
5302        assert_eq!(result.chunk.len(), 1);
5303        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5304        assert!(result.prev_batch_token.is_none());
5305        assert!(result.next_batch_token.is_some());
5306        assert!(result.recursion_depth.is_none());
5307
5308        opts.from = result.next_batch_token;
5309        let result = room
5310            .relations(target_event_id, opts)
5311            .await
5312            .expect("Failed to list relations the second time");
5313        assert_eq!(result.chunk.len(), 1);
5314        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5315        assert!(result.prev_batch_token.is_none());
5316        assert!(result.next_batch_token.is_none());
5317        assert!(result.recursion_depth.is_none());
5318    }
5319
5320    #[async_test]
5321    async fn test_power_levels_computation() {
5322        let server = MatrixMockServer::new().await;
5323        let client = server.client_builder().build().await;
5324
5325        let room_id = room_id!("!a:b.c");
5326        let sender_id = client.user_id().expect("No session id");
5327        let f = EventFactory::new().room(room_id).sender(sender_id);
5328        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5329
5330        // Computing the power levels will need these 3 state events:
5331        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5332        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5333        let room_member_event = f.member(sender_id).into();
5334
5335        // With only the room member event
5336        let room = server
5337            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5338            .await;
5339        let ctx = room
5340            .push_condition_room_ctx()
5341            .await
5342            .expect("Failed to get push condition context")
5343            .expect("Could not get push condition context");
5344
5345        // The internal power levels couldn't be computed
5346        assert!(ctx.power_levels.is_none());
5347
5348        // Adding the room creation event
5349        let room = server
5350            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5351            .await;
5352        let ctx = room
5353            .push_condition_room_ctx()
5354            .await
5355            .expect("Failed to get push condition context")
5356            .expect("Could not get push condition context");
5357
5358        // The internal power levels still couldn't be computed
5359        assert!(ctx.power_levels.is_none());
5360
5361        // With the room member, room creation and the power levels events
5362        let room = server
5363            .sync_room(
5364                &client,
5365                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5366            )
5367            .await;
5368        let ctx = room
5369            .push_condition_room_ctx()
5370            .await
5371            .expect("Failed to get push condition context")
5372            .expect("Could not get push condition context");
5373
5374        // The internal power levels can finally be computed
5375        assert!(ctx.power_levels.is_some());
5376    }
5377}