1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 ops::Deref,
21 sync::Arc,
22 time::Duration,
23};
24
25use async_stream::stream;
26use eyeball::SharedObservable;
27use futures_core::Stream;
28use futures_util::{
29 future::{try_join, try_join_all},
30 stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "e2e-encryption")]
36use matrix_sdk_base::crypto::{DecryptionSettings, RoomEventDecryptionResult};
37#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39use matrix_sdk_base::{
40 deserialized_responses::{
41 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
42 },
43 event_cache::store::media::IgnoreMediaRetentionPolicy,
44 media::MediaThumbnailSettings,
45 store::StateStoreExt,
46 ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
47 StateStoreDataValue,
48};
49#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
50use matrix_sdk_common::BoxFuture;
51use matrix_sdk_common::{
52 deserialized_responses::TimelineEvent,
53 executor::{spawn, JoinHandle},
54 timeout::timeout,
55};
56use mime::Mime;
57#[cfg(feature = "e2e-encryption")]
58use ruma::events::{
59 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
60 SyncMessageLikeEvent,
61};
62use ruma::{
63 api::client::{
64 config::{set_global_account_data, set_room_account_data},
65 context,
66 error::ErrorKind,
67 filter::LazyLoadOptions,
68 membership::{
69 ban_user, forget_room, get_member_events,
70 invite_user::{self, v3::InvitationRecipient},
71 kick_user, leave_room, unban_user, Invite3pid,
72 },
73 message::send_message_event,
74 read_marker::set_read_marker,
75 receipt::create_receipt,
76 redact::redact_event,
77 room::{get_room_event, report_content},
78 state::{get_state_events_for_key, send_state_event},
79 tag::{create_tag, delete_tag},
80 typing::create_typing_event::{self, v3::Typing},
81 },
82 assign,
83 events::{
84 beacon::BeaconEventContent,
85 beacon_info::BeaconInfoEventContent,
86 call::notify::{ApplicationType, CallNotifyEventContent, NotifyType},
87 direct::DirectEventContent,
88 marked_unread::{MarkedUnreadEventContent, UnstableMarkedUnreadEventContent},
89 receipt::{Receipt, ReceiptThread, ReceiptType},
90 room::{
91 avatar::{self, RoomAvatarEventContent},
92 encryption::RoomEncryptionEventContent,
93 history_visibility::HistoryVisibility,
94 member::{MembershipChange, SyncRoomMemberEvent},
95 message::{
96 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
97 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
98 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
99 VideoMessageEventContent,
100 },
101 name::RoomNameEventContent,
102 pinned_events::RoomPinnedEventsEventContent,
103 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
104 server_acl::RoomServerAclEventContent,
105 topic::RoomTopicEventContent,
106 ImageInfo, MediaSource, ThumbnailInfo,
107 },
108 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
109 tag::{TagInfo, TagName},
110 typing::SyncTypingEvent,
111 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
112 Mentions, MessageLikeEventContent, MessageLikeEventType, OriginalSyncStateEvent,
113 RedactContent, RedactedStateEventContent, RoomAccountDataEvent,
114 RoomAccountDataEventContent, RoomAccountDataEventType, StateEventContent, StateEventType,
115 StaticEventContent, StaticStateEventContent, SyncStateEvent,
116 },
117 push::{Action, PushConditionRoomCtx},
118 serde::Raw,
119 time::Instant,
120 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
121 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
122};
123use serde::de::DeserializeOwned;
124use thiserror::Error;
125use tokio::sync::broadcast;
126use tokio_stream::StreamExt;
127use tracing::{debug, info, instrument, warn};
128
129use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
130pub use self::{
131 member::{RoomMember, RoomMemberRole},
132 messages::{EventWithContextResponse, Messages, MessagesOptions},
133};
134#[cfg(doc)]
135use crate::event_cache::EventCache;
136use crate::{
137 attachment::{AttachmentConfig, AttachmentInfo},
138 client::WeakClient,
139 config::RequestConfig,
140 error::{BeaconError, WrongRoomState},
141 event_cache::{self, EventCacheDropHandles, RoomEventCache},
142 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
143 live_location_share::ObservableLiveLocation,
144 media::{MediaFormat, MediaRequestParameters},
145 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
146 room::{
147 knock_requests::{KnockRequest, KnockRequestMemberInfo},
148 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
149 privacy_settings::RoomPrivacySettings,
150 },
151 sync::RoomUpdate,
152 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
153 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
154};
155#[cfg(feature = "e2e-encryption")]
156use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
157
158pub mod edit;
159pub mod futures;
160pub mod identity_status_changes;
161pub mod knock_requests;
163mod member;
164mod messages;
165pub mod power_levels;
166
167pub mod privacy_settings;
169
170#[derive(Debug, Clone)]
173pub struct Room {
174 inner: BaseRoom,
175 pub(crate) client: Client,
176}
177
178impl Deref for Room {
179 type Target = BaseRoom;
180
181 fn deref(&self) -> &Self::Target {
182 &self.inner
183 }
184}
185
186const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
187const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
188
189impl Room {
190 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
197 Self { inner: room, client }
198 }
199
200 #[doc(alias = "reject_invitation")]
204 pub async fn leave(&self) -> Result<()> {
205 let state = self.state();
206 if state == RoomState::Left {
207 return Err(Error::WrongRoomState(WrongRoomState::new("Joined or Invited", state)));
208 }
209
210 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
211 self.client.send(request).await?;
212 self.client.base_client().room_left(self.room_id()).await?;
213 Ok(())
214 }
215
216 #[doc(alias = "accept_invitation")]
220 pub async fn join(&self) -> Result<()> {
221 let state = self.state();
222 if state == RoomState::Joined {
223 return Err(Error::WrongRoomState(WrongRoomState::new("Invited or Left", state)));
224 }
225
226 let prev_room_state = self.inner.state();
227
228 let mark_as_direct = prev_room_state == RoomState::Invited
229 && self.inner.is_direct().await.unwrap_or_else(|e| {
230 warn!(room_id = ?self.room_id(), "is_direct() failed: {e}");
231 false
232 });
233
234 self.client.join_room_by_id(self.room_id()).await?;
235
236 if mark_as_direct {
237 self.set_is_direct(true).await?;
238 }
239
240 Ok(())
241 }
242
243 pub fn client(&self) -> Client {
247 self.client.clone()
248 }
249
250 pub fn is_synced(&self) -> bool {
253 self.inner.is_state_fully_synced()
254 }
255
256 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
286 let Some(url) = self.avatar_url() else { return Ok(None) };
287 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
288 Ok(Some(self.client.media().get_media_content(&request, true).await?))
289 }
290
291 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
320 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
321 let room_id = self.inner.room_id();
322 let request = options.into_request(room_id);
323 let http_response = self.client.send(request).await?;
324
325 #[allow(unused_mut)]
326 let mut response = Messages {
327 start: http_response.start,
328 end: http_response.end,
329 #[cfg(not(feature = "e2e-encryption"))]
330 chunk: http_response
331 .chunk
332 .into_iter()
333 .map(|raw| TimelineEvent::new(raw.cast()))
334 .collect(),
335 #[cfg(feature = "e2e-encryption")]
336 chunk: Vec::with_capacity(http_response.chunk.len()),
337 state: http_response.state,
338 };
339
340 #[cfg(feature = "e2e-encryption")]
341 for event in http_response.chunk {
342 let decrypted_event = if let Ok(AnySyncTimelineEvent::MessageLike(
343 AnySyncMessageLikeEvent::RoomEncrypted(SyncMessageLikeEvent::Original(_)),
344 )) = event.deserialize_as::<AnySyncTimelineEvent>()
345 {
346 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
347 event
348 } else {
349 TimelineEvent::new(event.cast())
350 }
351 } else {
352 TimelineEvent::new(event.cast())
353 };
354 response.chunk.push(decrypted_event);
355 }
356
357 if let Some(push_context) = self.push_context().await? {
358 let push_rules = self.client().account().push_rules().await?;
359
360 for event in &mut response.chunk {
361 event.push_actions =
362 Some(push_rules.get_actions(event.raw(), &push_context).to_owned());
363 }
364 }
365
366 Ok(response)
367 }
368
369 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
379 where
380 Ev: SyncEvent + DeserializeOwned + Send + 'static,
381 H: EventHandler<Ev, Ctx>,
382 {
383 self.client.add_room_event_handler(self.room_id(), handler)
384 }
385
386 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
391 self.client.subscribe_to_room_updates(self.room_id())
392 }
393
394 pub fn subscribe_to_typing_notifications(
400 &self,
401 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
402 let (sender, receiver) = broadcast::channel(16);
403 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
404 let own_user_id = self.own_user_id().to_owned();
405 move |event: SyncTypingEvent| async move {
406 let typing_user_ids = event
408 .content
409 .user_ids
410 .into_iter()
411 .filter(|user_id| *user_id != own_user_id)
412 .collect();
413 let _ = sender.send(typing_user_ids);
415 }
416 });
417 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
418 (drop_guard, receiver)
419 }
420
421 #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
444 pub async fn subscribe_to_identity_status_changes(
445 &self,
446 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
447 IdentityStatusChanges::create_stream(self.clone()).await
448 }
449
450 async fn try_decrypt_event(&self, event: Raw<AnyTimelineEvent>) -> Result<TimelineEvent> {
456 #[cfg(feature = "e2e-encryption")]
457 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
458 SyncMessageLikeEvent::Original(_),
459 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
460 {
461 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
462 return Ok(event);
463 }
464 }
465
466 let mut event = TimelineEvent::new(event.cast());
467 event.push_actions = self.event_push_actions(event.raw()).await?;
468
469 Ok(event)
470 }
471
472 pub async fn event(
477 &self,
478 event_id: &EventId,
479 request_config: Option<RequestConfig>,
480 ) -> Result<TimelineEvent> {
481 let request =
482 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
483
484 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
485 let event = self.try_decrypt_event(raw_event).await?;
486
487 if let Ok((cache, _handles)) = self.event_cache().await {
489 cache.save_event(event.clone()).await;
490 }
491
492 Ok(event)
493 }
494
495 pub async fn event_with_context(
498 &self,
499 event_id: &EventId,
500 lazy_load_members: bool,
501 context_size: UInt,
502 request_config: Option<RequestConfig>,
503 ) -> Result<EventWithContextResponse> {
504 let mut request =
505 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
506
507 request.limit = context_size;
508
509 if lazy_load_members {
510 request.filter.lazy_load_options =
511 LazyLoadOptions::Enabled { include_redundant_members: false };
512 }
513
514 let response = self.client.send(request).with_request_config(request_config).await?;
515
516 let target_event = if let Some(event) = response.event {
517 Some(self.try_decrypt_event(event).await?)
518 } else {
519 None
520 };
521
522 let (events_before, events_after) = try_join(
526 try_join_all(response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev))),
527 try_join_all(response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev))),
528 )
529 .await?;
530
531 if let Ok((cache, _handles)) = self.event_cache().await {
533 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
534 if let Some(event) = &target_event {
535 events_to_save.push(event.clone());
536 }
537
538 for event in &events_before {
539 events_to_save.push(event.clone());
540 }
541
542 for event in &events_after {
543 events_to_save.push(event.clone());
544 }
545
546 cache.save_events(events_to_save).await;
547 }
548
549 Ok(EventWithContextResponse {
550 event: target_event,
551 events_before,
552 events_after,
553 state: response.state,
554 prev_batch_token: response.start,
555 next_batch_token: response.end,
556 })
557 }
558
559 pub(crate) async fn request_members(&self) -> Result<()> {
560 self.client
561 .locks()
562 .members_request_deduplicated_handler
563 .run(self.room_id().to_owned(), async move {
564 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
565 let response = self
566 .client
567 .send(request.clone())
568 .with_request_config(
569 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
572 )
573 .await?;
574
575 Box::pin(self.client.base_client().receive_all_members(
577 self.room_id(),
578 &request,
579 &response,
580 ))
581 .await?;
582
583 Ok(())
584 })
585 .await
586 }
587
588 async fn request_encryption_state(&self) -> Result<()> {
589 self.client
590 .locks()
591 .encryption_state_deduplicated_handler
592 .run(self.room_id().to_owned(), async move {
593 let request = get_state_events_for_key::v3::Request::new(
595 self.room_id().to_owned(),
596 StateEventType::RoomEncryption,
597 "".to_owned(),
598 );
599 let response = match self.client.send(request).await {
600 Ok(response) => {
601 Some(response.content.deserialize_as::<RoomEncryptionEventContent>()?)
602 }
603 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
604 Err(err) => return Err(err.into()),
605 };
606
607 let _sync_lock = self.client.base_client().sync_lock().lock().await;
608
609 let mut room_info = self.clone_info();
612 room_info.mark_encryption_state_synced();
613 room_info.set_encryption_event(response.clone());
614 let mut changes = StateChanges::default();
615 changes.add_room(room_info.clone());
616
617 self.client.store().save_changes(&changes).await?;
618 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
619
620 Ok(())
621 })
622 .await
623 }
624
625 pub async fn is_encrypted(&self) -> Result<bool> {
630 if !self.is_encryption_state_synced() {
631 self.request_encryption_state().await?;
632 }
633
634 Ok(self.inner.is_encrypted())
635 }
636
637 #[cfg(feature = "e2e-encryption")]
639 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
640 let encryption = self.client.encryption();
641
642 let this_device_is_verified = match encryption.get_own_device().await {
643 Ok(Some(device)) => device.is_verified_with_cross_signing(),
644
645 _ => true,
647 };
648
649 let backup_exists_on_server =
650 encryption.backups().exists_on_server().await.unwrap_or(false);
651
652 CryptoContextInfo {
653 device_creation_ts: encryption.device_creation_timestamp().await,
654 this_device_is_verified,
655 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
656 backup_exists_on_server,
657 }
658 }
659
660 fn are_events_visible(&self) -> bool {
661 if let RoomState::Invited = self.inner.state() {
662 return matches!(
663 self.inner.history_visibility_or_default(),
664 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
665 );
666 }
667
668 true
669 }
670
671 pub async fn sync_members(&self) -> Result<()> {
677 if !self.are_events_visible() {
678 return Ok(());
679 }
680
681 if !self.are_members_synced() {
682 self.request_members().await
683 } else {
684 Ok(())
685 }
686 }
687
688 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
702 self.sync_members().await?;
703 self.get_member_no_sync(user_id).await
704 }
705
706 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
720 Ok(self
721 .inner
722 .get_member(user_id)
723 .await?
724 .map(|member| RoomMember::new(self.client.clone(), member)))
725 }
726
727 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
736 self.sync_members().await?;
737 self.members_no_sync(memberships).await
738 }
739
740 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
749 Ok(self
750 .inner
751 .members(memberships)
752 .await?
753 .into_iter()
754 .map(|member| RoomMember::new(self.client.clone(), member))
755 .collect())
756 }
757
758 pub async fn get_state_events(
760 &self,
761 event_type: StateEventType,
762 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
763 self.client.store().get_state_events(self.room_id(), event_type).await.map_err(Into::into)
764 }
765
766 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
783 where
784 C: StaticEventContent + StaticStateEventContent + RedactContent,
785 C::Redacted: RedactedStateEventContent,
786 {
787 Ok(self.client.store().get_state_events_static(self.room_id()).await?)
788 }
789
790 pub async fn get_state_events_for_keys(
793 &self,
794 event_type: StateEventType,
795 state_keys: &[&str],
796 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
797 self.client
798 .store()
799 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
800 .await
801 .map_err(Into::into)
802 }
803
804 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
824 &self,
825 state_keys: I,
826 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
827 where
828 C: StaticEventContent + StaticStateEventContent + RedactContent,
829 C::StateKey: Borrow<K>,
830 C::Redacted: RedactedStateEventContent,
831 K: AsRef<str> + Sized + Sync + 'a,
832 I: IntoIterator<Item = &'a K> + Send,
833 I::IntoIter: Send,
834 {
835 Ok(self.client.store().get_state_events_for_keys_static(self.room_id(), state_keys).await?)
836 }
837
838 pub async fn get_state_event(
840 &self,
841 event_type: StateEventType,
842 state_key: &str,
843 ) -> Result<Option<RawAnySyncOrStrippedState>> {
844 self.client
845 .store()
846 .get_state_event(self.room_id(), event_type, state_key)
847 .await
848 .map_err(Into::into)
849 }
850
851 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
870 where
871 C: StaticEventContent + StaticStateEventContent<StateKey = EmptyStateKey> + RedactContent,
872 C::Redacted: RedactedStateEventContent,
873 {
874 self.get_state_event_static_for_key(&EmptyStateKey).await
875 }
876
877 pub async fn get_state_event_static_for_key<C, K>(
897 &self,
898 state_key: &K,
899 ) -> Result<Option<RawSyncOrStrippedState<C>>>
900 where
901 C: StaticEventContent + StaticStateEventContent + RedactContent,
902 C::StateKey: Borrow<K>,
903 C::Redacted: RedactedStateEventContent,
904 K: AsRef<str> + ?Sized + Sync,
905 {
906 Ok(self.client.store().get_state_event_static_for_key(self.room_id(), state_key).await?)
907 }
908
909 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
913 Ok(self
918 .get_state_events_static::<SpaceParentEventContent>()
919 .await?
920 .into_iter()
921 .flat_map(|parent_event| match parent_event.deserialize() {
923 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
924 Some((e.state_key.to_owned(), e.sender))
925 }
926 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
927 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
928 Err(e) => {
929 info!(room_id = ?self.room_id(), "Could not deserialize m.room.parent: {e}");
930 None
931 }
932 })
933 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
935 let Some(parent_room) = self.client.get_room(&state_key) else {
936 return Ok(ParentSpace::Unverifiable(state_key));
939 };
940 if let Some(child_event) = parent_room
943 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
944 .await?
945 {
946 match child_event.deserialize() {
947 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
948 return Ok(ParentSpace::Reciprocal(parent_room));
951 }
952 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
953 Ok(SyncOrStrippedState::Stripped(_)) => {}
954 Err(e) => {
955 info!(
956 room_id = ?self.room_id(), parent_room_id = ?state_key,
957 "Could not deserialize m.room.child: {e}"
958 );
959 }
960 }
961 }
966
967 let Some(member) = parent_room.get_member(&sender).await? else {
970 return Ok(ParentSpace::Illegitimate(parent_room));
972 };
973
974 if member.can_send_state(StateEventType::SpaceChild) {
975 Ok(ParentSpace::WithPowerlevel(parent_room))
977 } else {
978 Ok(ParentSpace::Illegitimate(parent_room))
979 }
980 })
981 .collect::<FuturesUnordered<_>>())
982 }
983
984 pub async fn account_data(
986 &self,
987 data_type: RoomAccountDataEventType,
988 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
989 self.client
990 .store()
991 .get_room_account_data_event(self.room_id(), data_type)
992 .await
993 .map_err(Into::into)
994 }
995
996 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1015 where
1016 C: StaticEventContent + RoomAccountDataEventContent,
1017 {
1018 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast))
1019 }
1020
1021 #[cfg(feature = "e2e-encryption")]
1026 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1027 let user_ids =
1028 self.client.store().get_user_ids(self.room_id(), RoomMemberships::empty()).await?;
1029
1030 for user_id in user_ids {
1031 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1032 let any_unverified = devices.devices().any(|d| !d.is_verified());
1033
1034 if any_unverified {
1035 return Ok(false);
1036 }
1037 }
1038
1039 Ok(true)
1040 }
1041
1042 pub async fn set_account_data<T>(
1057 &self,
1058 content: T,
1059 ) -> Result<set_room_account_data::v3::Response>
1060 where
1061 T: RoomAccountDataEventContent,
1062 {
1063 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1064
1065 let request = set_room_account_data::v3::Request::new(
1066 own_user.to_owned(),
1067 self.room_id().to_owned(),
1068 &content,
1069 )?;
1070
1071 Ok(self.client.send(request).await?)
1072 }
1073
1074 pub async fn set_account_data_raw(
1099 &self,
1100 event_type: RoomAccountDataEventType,
1101 content: Raw<AnyRoomAccountDataEventContent>,
1102 ) -> Result<set_room_account_data::v3::Response> {
1103 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1104
1105 let request = set_room_account_data::v3::Request::new_raw(
1106 own_user.to_owned(),
1107 self.room_id().to_owned(),
1108 event_type,
1109 content,
1110 );
1111
1112 Ok(self.client.send(request).await?)
1113 }
1114
1115 pub async fn set_tag(
1146 &self,
1147 tag: TagName,
1148 tag_info: TagInfo,
1149 ) -> Result<create_tag::v3::Response> {
1150 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1151 let request = create_tag::v3::Request::new(
1152 user_id.to_owned(),
1153 self.inner.room_id().to_owned(),
1154 tag.to_string(),
1155 tag_info,
1156 );
1157 Ok(self.client.send(request).await?)
1158 }
1159
1160 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1167 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1168 let request = delete_tag::v3::Request::new(
1169 user_id.to_owned(),
1170 self.inner.room_id().to_owned(),
1171 tag.to_string(),
1172 );
1173 Ok(self.client.send(request).await?)
1174 }
1175
1176 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1186 if is_favourite {
1187 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1188
1189 self.set_tag(TagName::Favorite, tag_info).await?;
1190
1191 if self.is_low_priority() {
1192 self.remove_tag(TagName::LowPriority).await?;
1193 }
1194 } else {
1195 self.remove_tag(TagName::Favorite).await?;
1196 }
1197 Ok(())
1198 }
1199
1200 pub async fn set_is_low_priority(
1210 &self,
1211 is_low_priority: bool,
1212 tag_order: Option<f64>,
1213 ) -> Result<()> {
1214 if is_low_priority {
1215 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1216
1217 self.set_tag(TagName::LowPriority, tag_info).await?;
1218
1219 if self.is_favourite() {
1220 self.remove_tag(TagName::Favorite).await?;
1221 }
1222 } else {
1223 self.remove_tag(TagName::LowPriority).await?;
1224 }
1225 Ok(())
1226 }
1227
1228 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1237 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1238
1239 let mut content = self
1240 .client
1241 .account()
1242 .account_data::<DirectEventContent>()
1243 .await?
1244 .map(|c| c.deserialize())
1245 .transpose()?
1246 .unwrap_or_default();
1247
1248 let this_room_id = self.inner.room_id();
1249
1250 if is_direct {
1251 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1252 room_members.retain(|member| member.user_id() != self.own_user_id());
1253
1254 for member in room_members {
1255 let entry = content.entry(member.user_id().into()).or_default();
1256 if !entry.iter().any(|room_id| room_id == this_room_id) {
1257 entry.push(this_room_id.to_owned());
1258 }
1259 }
1260 } else {
1261 for (_, list) in content.iter_mut() {
1262 list.retain(|room_id| *room_id != this_room_id);
1263 }
1264
1265 content.retain(|_, list| !list.is_empty());
1267 }
1268
1269 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1270
1271 self.client.send(request).await?;
1272 Ok(())
1273 }
1274
1275 #[cfg(feature = "e2e-encryption")]
1283 pub async fn decrypt_event(
1284 &self,
1285 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1286 ) -> Result<TimelineEvent> {
1287 let machine = self.client.olm_machine().await;
1288 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1289
1290 let decryption_settings = DecryptionSettings {
1291 sender_device_trust_requirement: self.client.base_client().decryption_trust_requirement,
1292 };
1293 let mut event: TimelineEvent = match machine
1294 .try_decrypt_room_event(event.cast_ref(), self.inner.room_id(), &decryption_settings)
1295 .await?
1296 {
1297 RoomEventDecryptionResult::Decrypted(decrypted) => decrypted.into(),
1298 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1299 self.client
1300 .encryption()
1301 .backups()
1302 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1303 TimelineEvent::new_utd_event(event.clone().cast(), utd_info)
1304 }
1305 };
1306
1307 event.push_actions = self.event_push_actions(event.raw()).await?;
1308 Ok(event)
1309 }
1310
1311 #[cfg(feature = "e2e-encryption")]
1324 pub async fn discard_room_key(&self) -> Result<()> {
1325 let machine = self.client.olm_machine().await;
1326 if let Some(machine) = machine.as_ref() {
1327 machine.discard_room_key(self.inner.room_id()).await?;
1328 Ok(())
1329 } else {
1330 Err(Error::NoOlmMachine)
1331 }
1332 }
1333
1334 #[instrument(skip_all)]
1342 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1343 let request = assign!(
1344 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1345 { reason: reason.map(ToOwned::to_owned) }
1346 );
1347 self.client.send(request).await?;
1348 Ok(())
1349 }
1350
1351 #[instrument(skip_all)]
1359 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1360 let request = assign!(
1361 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1362 { reason: reason.map(ToOwned::to_owned) }
1363 );
1364 self.client.send(request).await?;
1365 Ok(())
1366 }
1367
1368 #[instrument(skip_all)]
1377 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1378 let request = assign!(
1379 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1380 { reason: reason.map(ToOwned::to_owned) }
1381 );
1382 self.client.send(request).await?;
1383 Ok(())
1384 }
1385
1386 #[instrument(skip_all)]
1392 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1393 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1394 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1395 self.client.send(request).await?;
1396
1397 self.mark_members_missing();
1401
1402 Ok(())
1403 }
1404
1405 #[instrument(skip_all)]
1411 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1412 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1413 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1414 self.client.send(request).await?;
1415
1416 self.mark_members_missing();
1420
1421 Ok(())
1422 }
1423
1424 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1459 self.ensure_room_joined()?;
1460
1461 let send = if let Some(typing_time) =
1464 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1465 {
1466 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1467 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1471 } else {
1472 !typing
1474 }
1475 } else {
1476 typing
1479 };
1480
1481 if send {
1482 self.send_typing_notice(typing).await?;
1483 }
1484
1485 Ok(())
1486 }
1487
1488 #[instrument(name = "typing_notice", skip(self))]
1489 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1490 let typing = if typing {
1491 self.client
1492 .inner
1493 .typing_notice_times
1494 .write()
1495 .unwrap()
1496 .insert(self.room_id().to_owned(), Instant::now());
1497 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1498 } else {
1499 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1500 Typing::No
1501 };
1502
1503 let request = create_typing_event::v3::Request::new(
1504 self.own_user_id().to_owned(),
1505 self.room_id().to_owned(),
1506 typing,
1507 );
1508
1509 self.client.send(request).await?;
1510
1511 Ok(())
1512 }
1513
1514 #[instrument(skip_all)]
1528 pub async fn send_single_receipt(
1529 &self,
1530 receipt_type: create_receipt::v3::ReceiptType,
1531 thread: ReceiptThread,
1532 event_id: OwnedEventId,
1533 ) -> Result<()> {
1534 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1537
1538 self.client
1539 .inner
1540 .locks
1541 .read_receipt_deduplicated_handler
1542 .run((request_key, event_id.clone()), async {
1543 let mut request = create_receipt::v3::Request::new(
1544 self.room_id().to_owned(),
1545 receipt_type,
1546 event_id,
1547 );
1548 request.thread = thread;
1549
1550 self.client.send(request).await?;
1551 Ok(())
1552 })
1553 .await
1554 }
1555
1556 #[instrument(skip_all)]
1564 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
1565 if receipts.is_empty() {
1566 return Ok(());
1567 }
1568
1569 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
1570 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
1571 fully_read,
1572 read_receipt: public_read_receipt,
1573 private_read_receipt,
1574 });
1575
1576 self.client.send(request).await?;
1577 Ok(())
1578 }
1579
1580 #[instrument(skip_all)]
1612 pub async fn enable_encryption(&self) -> Result<()> {
1613 use ruma::{
1614 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
1615 };
1616 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
1617
1618 if !self.is_encrypted().await? {
1619 let content =
1620 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
1621 self.send_state_event(content).await?;
1622
1623 _ = timeout(self.client.inner.sync_beat.listen(), SYNC_WAIT_TIME).await;
1627
1628 let _sync_lock = self.client.base_client().sync_lock().lock().await;
1633 if !self.inner.is_encrypted() {
1634 debug!("still not marked as encrypted, marking encryption state as missing");
1635
1636 let mut room_info = self.clone_info();
1637 room_info.mark_encryption_state_missing();
1638 let mut changes = StateChanges::default();
1639 changes.add_room(room_info.clone());
1640
1641 self.client.store().save_changes(&changes).await?;
1642 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1643 } else {
1644 debug!("room successfully marked as encrypted");
1645 }
1646 }
1647
1648 Ok(())
1649 }
1650
1651 #[cfg(feature = "e2e-encryption")]
1660 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
1661 async fn preshare_room_key(&self) -> Result<()> {
1662 self.ensure_room_joined()?;
1663
1664 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1666 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
1667
1668 self.client
1669 .locks()
1670 .group_session_deduplicated_handler
1671 .run(self.room_id().to_owned(), async move {
1672 {
1673 let members = self
1674 .client
1675 .store()
1676 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
1677 .await?;
1678 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
1679 };
1680
1681 let response = self.share_room_key().await;
1682
1683 if let Err(r) = response {
1687 let machine = self.client.olm_machine().await;
1688 if let Some(machine) = machine.as_ref() {
1689 machine.discard_room_key(self.room_id()).await?;
1690 }
1691 return Err(r);
1692 }
1693
1694 Ok(())
1695 })
1696 .await
1697 }
1698
1699 #[cfg(feature = "e2e-encryption")]
1705 #[instrument(skip_all)]
1706 async fn share_room_key(&self) -> Result<()> {
1707 self.ensure_room_joined()?;
1708
1709 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
1710
1711 for request in requests {
1712 let response = self.client.send_to_device(&request).await?;
1713 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
1714 }
1715
1716 Ok(())
1717 }
1718
1719 #[instrument(skip_all)]
1728 pub async fn sync_up(&self) {
1729 while !self.is_synced() && self.state() == RoomState::Joined {
1730 let wait_for_beat = self.client.inner.sync_beat.listen();
1731 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
1733 }
1734 }
1735
1736 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
1806 SendMessageLikeEvent::new(self, content)
1807 }
1808
1809 #[cfg(feature = "e2e-encryption")]
1811 async fn query_keys_for_untracked_users(&self) -> Result<()> {
1812 let olm = self.client.olm_machine().await;
1813 let olm = olm.as_ref().expect("Olm machine wasn't started");
1814
1815 let members =
1816 self.client.store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
1817
1818 let tracked: HashMap<_, _> = olm
1819 .store()
1820 .load_tracked_users()
1821 .await?
1822 .into_iter()
1823 .map(|tracked| (tracked.user_id, tracked.dirty))
1824 .collect();
1825
1826 let members_with_unknown_devices =
1829 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
1830
1831 let (req_id, request) =
1832 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
1833
1834 if !request.device_keys.is_empty() {
1835 self.client.keys_query(&req_id, request.device_keys).await?;
1836 }
1837
1838 Ok(())
1839 }
1840
1841 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
1885 pub fn send_raw<'a>(
1886 &'a self,
1887 event_type: &'a str,
1888 content: impl IntoRawMessageLikeEventContent,
1889 ) -> SendRawMessageLikeEvent<'a> {
1890 SendRawMessageLikeEvent::new(self, event_type, content)
1893 }
1894
1895 #[instrument(skip_all)]
1943 pub fn send_attachment<'a>(
1944 &'a self,
1945 filename: impl Into<String>,
1946 content_type: &'a Mime,
1947 data: Vec<u8>,
1948 config: AttachmentConfig,
1949 ) -> SendAttachment<'a> {
1950 SendAttachment::new(self, filename.into(), content_type, data, config)
1951 }
1952
1953 #[instrument(skip_all)]
1981 pub(super) async fn prepare_and_send_attachment<'a>(
1982 &'a self,
1983 filename: String,
1984 content_type: &'a Mime,
1985 data: Vec<u8>,
1986 mut config: AttachmentConfig,
1987 send_progress: SharedObservable<TransmissionProgress>,
1988 store_in_cache: bool,
1989 ) -> Result<send_message_event::v3::Response> {
1990 self.ensure_room_joined()?;
1991
1992 let txn_id = config.txn_id.take();
1993 let mentions = config.mentions.take();
1994
1995 let thumbnail = config.thumbnail.take();
1996
1997 let thumbnail_cache_info = if store_in_cache {
1999 thumbnail
2000 .as_ref()
2001 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2002 } else {
2003 None
2004 };
2005
2006 #[cfg(feature = "e2e-encryption")]
2007 let (media_source, thumbnail) = if self.is_encrypted().await? {
2008 self.client
2009 .upload_encrypted_media_and_thumbnail(content_type, &data, thumbnail, send_progress)
2010 .await?
2011 } else {
2012 self.client
2013 .media()
2014 .upload_plain_media_and_thumbnail(
2015 content_type,
2016 data.clone(),
2019 thumbnail,
2020 send_progress,
2021 )
2022 .await?
2023 };
2024
2025 #[cfg(not(feature = "e2e-encryption"))]
2026 let (media_source, thumbnail) = self
2027 .client
2028 .media()
2029 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2030 .await?;
2031
2032 if store_in_cache {
2033 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2034
2035 debug!("caching the media");
2039 let request =
2040 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2041
2042 if let Err(err) = cache_store_lock_guard
2043 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2044 .await
2045 {
2046 warn!("unable to cache the media after uploading it: {err}");
2047 }
2048
2049 if let Some(((data, height, width), source)) =
2050 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2051 {
2052 debug!("caching the thumbnail");
2053
2054 let request = MediaRequestParameters {
2055 source: source.clone(),
2056 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2057 };
2058
2059 if let Err(err) = cache_store_lock_guard
2060 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2061 .await
2062 {
2063 warn!("unable to cache the media after uploading it: {err}");
2064 }
2065 }
2066 }
2067
2068 let content = Self::make_attachment_event(
2069 self.make_attachment_type(
2070 content_type,
2071 filename,
2072 media_source,
2073 config.caption,
2074 config.formatted_caption,
2075 config.info,
2076 thumbnail,
2077 ),
2078 mentions,
2079 );
2080
2081 let mut fut = self.send(content);
2082 if let Some(txn_id) = txn_id {
2083 fut = fut.with_transaction_id(txn_id);
2084 }
2085 fut.await
2086 }
2087
2088 #[allow(clippy::too_many_arguments)]
2091 pub(crate) fn make_attachment_type(
2092 &self,
2093 content_type: &Mime,
2094 filename: String,
2095 source: MediaSource,
2096 caption: Option<String>,
2097 formatted_caption: Option<FormattedBody>,
2098 info: Option<AttachmentInfo>,
2099 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2100 ) -> MessageType {
2101 let (body, filename) = match caption {
2105 Some(caption) => (caption, Some(filename)),
2106 None => (filename, None),
2107 };
2108
2109 let (thumbnail_source, thumbnail_info) = thumbnail.unzip();
2110
2111 match content_type.type_() {
2112 mime::IMAGE => {
2113 let info = assign!(info.map(ImageInfo::from).unwrap_or_default(), {
2114 mimetype: Some(content_type.as_ref().to_owned()),
2115 thumbnail_source,
2116 thumbnail_info
2117 });
2118 let content = assign!(ImageMessageEventContent::new(body, source), {
2119 info: Some(Box::new(info)),
2120 formatted: formatted_caption,
2121 filename
2122 });
2123 MessageType::Image(content)
2124 }
2125
2126 mime::AUDIO => {
2127 let mut content = assign!(AudioMessageEventContent::new(body, source), {
2128 formatted: formatted_caption,
2129 filename
2130 });
2131
2132 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
2133 &info
2134 {
2135 if let Some(duration) = audio_info.duration {
2136 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
2137 content.audio =
2138 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
2139 }
2140 content.voice = Some(UnstableVoiceContentBlock::new());
2141 }
2142
2143 let mut audio_info = info.map(AudioInfo::from).unwrap_or_default();
2144 audio_info.mimetype = Some(content_type.as_ref().to_owned());
2145 let content = content.info(Box::new(audio_info));
2146
2147 MessageType::Audio(content)
2148 }
2149
2150 mime::VIDEO => {
2151 let info = assign!(info.map(VideoInfo::from).unwrap_or_default(), {
2152 mimetype: Some(content_type.as_ref().to_owned()),
2153 thumbnail_source,
2154 thumbnail_info
2155 });
2156 let content = assign!(VideoMessageEventContent::new(body, source), {
2157 info: Some(Box::new(info)),
2158 formatted: formatted_caption,
2159 filename
2160 });
2161 MessageType::Video(content)
2162 }
2163
2164 _ => {
2165 let info = assign!(info.map(FileInfo::from).unwrap_or_default(), {
2166 mimetype: Some(content_type.as_ref().to_owned()),
2167 thumbnail_source,
2168 thumbnail_info
2169 });
2170 let content = assign!(FileMessageEventContent::new(body, source), {
2171 info: Some(Box::new(info)),
2172 formatted: formatted_caption,
2173 filename,
2174 });
2175 MessageType::File(content)
2176 }
2177 }
2178 }
2179
2180 pub(crate) fn make_attachment_event(
2183 msg_type: MessageType,
2184 mentions: Option<Mentions>,
2185 ) -> RoomMessageEventContent {
2186 let mut content = RoomMessageEventContent::new(msg_type);
2187 if let Some(mentions) = mentions {
2188 content = content.add_mentions(mentions);
2189 }
2190 content
2191 }
2192
2193 pub async fn update_power_levels(
2202 &self,
2203 updates: Vec<(&UserId, Int)>,
2204 ) -> Result<send_state_event::v3::Response> {
2205 let mut power_levels = self.power_levels().await?;
2206
2207 for (user_id, new_level) in updates {
2208 if new_level == power_levels.users_default {
2209 power_levels.users.remove(user_id);
2210 } else {
2211 power_levels.users.insert(user_id.to_owned(), new_level);
2212 }
2213 }
2214
2215 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await
2216 }
2217
2218 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2223 let mut power_levels = self.power_levels().await?;
2224 power_levels.apply(changes)?;
2225 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await?;
2226 Ok(())
2227 }
2228
2229 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2233 let default_power_levels = RoomPowerLevels::from(RoomPowerLevelsEventContent::new());
2234 let changes = RoomPowerLevelChanges::from(default_power_levels);
2235 self.apply_power_level_changes(changes).await?;
2236 Ok(self.power_levels().await?)
2237 }
2238
2239 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2244 let power_level = self.get_user_power_level(user_id).await?;
2245 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2246 }
2247
2248 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<i64> {
2253 let event = self.power_levels().await?;
2254 Ok(event.for_user(user_id).into())
2255 }
2256
2257 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2260 let power_levels = self.power_levels().await.ok();
2261 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2262 if let Some(power_levels) = power_levels {
2263 for (id, level) in power_levels.users.into_iter() {
2264 user_power_levels.insert(id, level.into());
2265 }
2266 }
2267 user_power_levels
2268 }
2269
2270 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2272 self.send_state_event(RoomNameEventContent::new(name)).await
2273 }
2274
2275 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2277 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2278 }
2279
2280 pub async fn set_avatar_url(
2286 &self,
2287 url: &MxcUri,
2288 info: Option<avatar::ImageInfo>,
2289 ) -> Result<send_state_event::v3::Response> {
2290 self.ensure_room_joined()?;
2291
2292 let mut room_avatar_event = RoomAvatarEventContent::new();
2293 room_avatar_event.url = Some(url.to_owned());
2294 room_avatar_event.info = info.map(Box::new);
2295
2296 self.send_state_event(room_avatar_event).await
2297 }
2298
2299 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2301 self.send_state_event(RoomAvatarEventContent::new()).await
2302 }
2303
2304 pub async fn upload_avatar(
2312 &self,
2313 mime: &Mime,
2314 data: Vec<u8>,
2315 info: Option<avatar::ImageInfo>,
2316 ) -> Result<send_state_event::v3::Response> {
2317 self.ensure_room_joined()?;
2318
2319 let upload_response = self.client.media().upload(mime, data, None).await?;
2320 let mut info = info.unwrap_or_default();
2321 info.blurhash = upload_response.blurhash;
2322 info.mimetype = Some(mime.to_string());
2323
2324 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2325 }
2326
2327 #[instrument(skip_all)]
2371 pub async fn send_state_event(
2372 &self,
2373 content: impl StateEventContent<StateKey = EmptyStateKey>,
2374 ) -> Result<send_state_event::v3::Response> {
2375 self.send_state_event_for_key(&EmptyStateKey, content).await
2376 }
2377
2378 pub async fn send_state_event_for_key<C, K>(
2419 &self,
2420 state_key: &K,
2421 content: C,
2422 ) -> Result<send_state_event::v3::Response>
2423 where
2424 C: StateEventContent,
2425 C::StateKey: Borrow<K>,
2426 K: AsRef<str> + ?Sized,
2427 {
2428 self.ensure_room_joined()?;
2429 let request =
2430 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2431 let response = self.client.send(request).await?;
2432 Ok(response)
2433 }
2434
2435 #[instrument(skip_all)]
2470 pub async fn send_state_event_raw(
2471 &self,
2472 event_type: &str,
2473 state_key: &str,
2474 content: impl IntoRawStateEventContent,
2475 ) -> Result<send_state_event::v3::Response> {
2476 self.ensure_room_joined()?;
2477
2478 let request = send_state_event::v3::Request::new_raw(
2479 self.room_id().to_owned(),
2480 event_type.into(),
2481 state_key.to_owned(),
2482 content.into_raw_state_event_content(),
2483 );
2484
2485 Ok(self.client.send(request).await?)
2486 }
2487
2488 #[instrument(skip_all)]
2523 pub async fn redact(
2524 &self,
2525 event_id: &EventId,
2526 reason: Option<&str>,
2527 txn_id: Option<OwnedTransactionId>,
2528 ) -> HttpResult<redact_event::v3::Response> {
2529 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
2530 let request = assign!(
2531 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
2532 { reason: reason.map(ToOwned::to_owned) }
2533 );
2534
2535 self.client.send(request).await
2536 }
2537
2538 pub async fn can_user_redact_own(&self, user_id: &UserId) -> Result<bool> {
2543 Ok(self.power_levels().await?.user_can_redact_own_event(user_id))
2544 }
2545
2546 pub async fn can_user_redact_other(&self, user_id: &UserId) -> Result<bool> {
2551 Ok(self.power_levels().await?.user_can_redact_event_of_other(user_id))
2552 }
2553
2554 pub async fn can_user_ban(&self, user_id: &UserId) -> Result<bool> {
2559 Ok(self.power_levels().await?.user_can_ban(user_id))
2560 }
2561
2562 pub async fn can_user_invite(&self, user_id: &UserId) -> Result<bool> {
2567 Ok(self.power_levels().await?.user_can_invite(user_id))
2568 }
2569
2570 pub async fn can_user_kick(&self, user_id: &UserId) -> Result<bool> {
2575 Ok(self.power_levels().await?.user_can_kick(user_id))
2576 }
2577
2578 pub async fn can_user_send_state(
2583 &self,
2584 user_id: &UserId,
2585 state_event: StateEventType,
2586 ) -> Result<bool> {
2587 Ok(self.power_levels().await?.user_can_send_state(user_id, state_event))
2588 }
2589
2590 pub async fn can_user_send_message(
2595 &self,
2596 user_id: &UserId,
2597 message: MessageLikeEventType,
2598 ) -> Result<bool> {
2599 Ok(self.power_levels().await?.user_can_send_message(user_id, message))
2600 }
2601
2602 pub async fn can_user_pin_unpin(&self, user_id: &UserId) -> Result<bool> {
2607 Ok(self
2608 .power_levels()
2609 .await?
2610 .user_can_send_state(user_id, StateEventType::RoomPinnedEvents))
2611 }
2612
2613 pub async fn can_user_trigger_room_notification(&self, user_id: &UserId) -> Result<bool> {
2618 Ok(self.power_levels().await?.user_can_trigger_room_notification(user_id))
2619 }
2620
2621 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
2630 let acl_ev = self
2631 .get_state_event_static::<RoomServerAclEventContent>()
2632 .await?
2633 .and_then(|ev| ev.deserialize().ok());
2634 let acl = acl_ev.as_ref().and_then(|ev| match ev {
2635 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
2636 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
2637 });
2638
2639 let members: Vec<_> = self
2643 .members_no_sync(RoomMemberships::JOIN)
2644 .await?
2645 .into_iter()
2646 .filter(|member| {
2647 let server = member.user_id().server_name();
2648 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
2649 })
2650 .collect();
2651
2652 let max = members
2655 .iter()
2656 .max_by_key(|member| member.power_level())
2657 .filter(|max| max.power_level() >= 50)
2658 .map(|member| member.user_id().server_name());
2659
2660 let servers = members
2662 .iter()
2663 .map(|member| member.user_id().server_name())
2664 .filter(|server| max.filter(|max| max == server).is_none())
2665 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
2666 *servers.entry(server).or_default() += 1;
2667 servers
2668 });
2669 let mut servers: Vec<_> = servers.into_iter().collect();
2670 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
2671
2672 Ok(max
2673 .into_iter()
2674 .chain(servers.into_iter().map(|(name, _)| name))
2675 .take(3)
2676 .map(ToOwned::to_owned)
2677 .collect())
2678 }
2679
2680 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
2687 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2688 return Ok(alias.matrix_to_uri());
2689 }
2690
2691 let via = self.route().await?;
2692 Ok(self.room_id().matrix_to_uri_via(via))
2693 }
2694
2695 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
2706 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2707 return Ok(alias.matrix_uri(join));
2708 }
2709
2710 let via = self.route().await?;
2711 Ok(self.room_id().matrix_uri_via(via, join))
2712 }
2713
2714 pub async fn matrix_to_event_permalink(
2728 &self,
2729 event_id: impl Into<OwnedEventId>,
2730 ) -> Result<MatrixToUri> {
2731 let via = self.route().await?;
2734 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
2735 }
2736
2737 pub async fn matrix_event_permalink(
2751 &self,
2752 event_id: impl Into<OwnedEventId>,
2753 ) -> Result<MatrixUri> {
2754 let via = self.route().await?;
2757 Ok(self.room_id().matrix_event_uri_via(event_id, via))
2758 }
2759
2760 pub async fn load_user_receipt(
2773 &self,
2774 receipt_type: ReceiptType,
2775 thread: ReceiptThread,
2776 user_id: &UserId,
2777 ) -> Result<Option<(OwnedEventId, Receipt)>> {
2778 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
2779 }
2780
2781 pub async fn load_event_receipts(
2794 &self,
2795 receipt_type: ReceiptType,
2796 thread: ReceiptThread,
2797 event_id: &EventId,
2798 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
2799 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
2800 }
2801
2802 pub async fn push_context(&self) -> Result<Option<PushConditionRoomCtx>> {
2807 let room_id = self.room_id();
2808 let user_id = self.own_user_id();
2809 let room_info = self.clone_info();
2810 let member_count = room_info.active_members_count();
2811
2812 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
2813 member.name().to_owned()
2814 } else {
2815 return Ok(None);
2816 };
2817
2818 let power_levels = self
2819 .get_state_event_static::<RoomPowerLevelsEventContent>()
2820 .await?
2821 .and_then(|e| e.deserialize().ok())
2822 .map(|e| e.power_levels().into());
2823
2824 Ok(Some(PushConditionRoomCtx {
2825 user_id: user_id.to_owned(),
2826 room_id: room_id.to_owned(),
2827 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
2828 user_display_name,
2829 power_levels,
2830 }))
2831 }
2832
2833 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
2838 let Some(push_context) = self.push_context().await? else {
2839 debug!("Could not aggregate push context");
2840 return Ok(None);
2841 };
2842
2843 let push_rules = self.client().account().push_rules().await?;
2844
2845 Ok(Some(push_rules.get_actions(event, &push_context).to_owned()))
2846 }
2847
2848 pub async fn invite_details(&self) -> Result<Invite> {
2851 let state = self.state();
2852 if state != RoomState::Invited {
2853 return Err(Error::WrongRoomState(WrongRoomState::new("Invited", state)));
2854 }
2855
2856 let invitee = self
2857 .get_member_no_sync(self.own_user_id())
2858 .await?
2859 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
2860 let event = invitee.event();
2861 let inviter_id = event.sender();
2862 let inviter = self.get_member_no_sync(inviter_id).await?;
2863 Ok(Invite { invitee, inviter })
2864 }
2865
2866 pub async fn own_membership_details(&self) -> Result<(RoomMember, Option<RoomMember>)> {
2874 let Some(own_member) = self.get_member_no_sync(self.own_user_id()).await? else {
2875 return Err(Error::InsufficientData);
2876 };
2877
2878 let sender_member =
2879 if let Some(member) = self.get_member_no_sync(own_member.event().sender()).await? {
2880 Some(member)
2882 } else if self.are_members_synced() {
2883 None
2885 } else if self.sync_members().await.is_ok() {
2886 self.get_member_no_sync(own_member.event().sender()).await?
2888 } else {
2889 None
2890 };
2891
2892 Ok((own_member, sender_member))
2893 }
2894
2895 pub async fn forget(&self) -> Result<()> {
2901 let state = self.state();
2902 match state {
2903 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
2904 return Err(Error::WrongRoomState(WrongRoomState::new("Left / Banned", state)));
2905 }
2906 RoomState::Left | RoomState::Banned => {}
2907 }
2908
2909 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
2910 let _response = self.client.send(request).await?;
2911
2912 if self.inner.direct_targets_length() != 0 {
2914 if let Err(e) = self.set_is_direct(false).await {
2915 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
2918 }
2919 }
2920
2921 self.client.base_client().forget_room(self.inner.room_id()).await?;
2922
2923 Ok(())
2924 }
2925
2926 fn ensure_room_joined(&self) -> Result<()> {
2927 let state = self.state();
2928 if state == RoomState::Joined {
2929 Ok(())
2930 } else {
2931 Err(Error::WrongRoomState(WrongRoomState::new("Joined", state)))
2932 }
2933 }
2934
2935 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
2937 if !matches!(self.state(), RoomState::Joined) {
2938 return None;
2939 }
2940
2941 let notification_settings = self.client().notification_settings().await;
2942
2943 let notification_mode =
2945 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
2946
2947 if notification_mode.is_some() {
2948 notification_mode
2949 } else if let Ok(is_encrypted) = self.is_encrypted().await {
2950 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
2955 let default_mode = notification_settings
2956 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
2957 .await;
2958 Some(default_mode)
2959 } else {
2960 None
2961 }
2962 }
2963
2964 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
2975 if !matches!(self.state(), RoomState::Joined) {
2976 return None;
2977 }
2978
2979 let notification_settings = self.client().notification_settings().await;
2980
2981 let mode =
2983 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
2984
2985 if let Some(mode) = mode {
2986 self.update_cached_user_defined_notification_mode(mode);
2987 }
2988
2989 mode
2990 }
2991
2992 pub async fn report_content(
3005 &self,
3006 event_id: OwnedEventId,
3007 score: Option<ReportedContentScore>,
3008 reason: Option<String>,
3009 ) -> Result<report_content::v3::Response> {
3010 let state = self.state();
3011 if state != RoomState::Joined {
3012 return Err(Error::WrongRoomState(WrongRoomState::new("Joined", state)));
3013 }
3014
3015 let request = report_content::v3::Request::new(
3016 self.inner.room_id().to_owned(),
3017 event_id,
3018 score.map(Into::into),
3019 reason,
3020 );
3021 Ok(self.client.send(request).await?)
3022 }
3023
3024 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3027 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3028
3029 let content = UnstableMarkedUnreadEventContent::from(MarkedUnreadEventContent::new(unread));
3030
3031 let request = set_room_account_data::v3::Request::new(
3032 user_id.to_owned(),
3033 self.inner.room_id().to_owned(),
3034 &content,
3035 )?;
3036
3037 self.client.send(request).await?;
3038 Ok(())
3039 }
3040
3041 pub async fn event_cache(
3044 &self,
3045 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3046 self.client.event_cache().for_room(self.room_id()).await
3047 }
3048
3049 pub async fn send_call_notification_if_needed(&self) -> Result<()> {
3059 if self.has_active_room_call() {
3060 return Ok(());
3061 }
3062
3063 if !self.can_user_trigger_room_notification(self.own_user_id()).await? {
3064 return Ok(());
3065 }
3066
3067 self.send_call_notification(
3068 self.room_id().to_string().to_owned(),
3069 ApplicationType::Call,
3070 if self.is_direct().await.unwrap_or(false) {
3071 NotifyType::Ring
3072 } else {
3073 NotifyType::Notify
3074 },
3075 Mentions::with_room_mention(),
3076 )
3077 .await?;
3078
3079 Ok(())
3080 }
3081
3082 pub(crate) async fn get_user_beacon_info(
3089 &self,
3090 user_id: &UserId,
3091 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3092 let raw_event = self
3093 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3094 .await?
3095 .ok_or(BeaconError::NotFound)?;
3096
3097 match raw_event.deserialize()? {
3098 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3099 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3100 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3101 }
3102 }
3103
3104 pub async fn start_live_location_share(
3117 &self,
3118 duration_millis: u64,
3119 description: Option<String>,
3120 ) -> Result<send_state_event::v3::Response> {
3121 self.ensure_room_joined()?;
3122
3123 self.send_state_event_for_key(
3124 self.own_user_id(),
3125 BeaconInfoEventContent::new(
3126 description,
3127 Duration::from_millis(duration_millis),
3128 true,
3129 None,
3130 ),
3131 )
3132 .await
3133 }
3134
3135 pub async fn stop_live_location_share(
3142 &self,
3143 ) -> Result<send_state_event::v3::Response, BeaconError> {
3144 self.ensure_room_joined()?;
3145
3146 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3147 beacon_info_event.content.stop();
3148 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3149 }
3150
3151 pub async fn send_location_beacon(
3163 &self,
3164 geo_uri: String,
3165 ) -> Result<send_message_event::v3::Response, BeaconError> {
3166 self.ensure_room_joined()?;
3167
3168 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3169
3170 if beacon_info_event.content.is_live() {
3171 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3172 Ok(self.send(content).await?)
3173 } else {
3174 Err(BeaconError::NotLive)
3175 }
3176 }
3177
3178 pub async fn send_call_notification(
3190 &self,
3191 call_id: String,
3192 application: ApplicationType,
3193 notify_type: NotifyType,
3194 mentions: Mentions,
3195 ) -> Result<()> {
3196 let call_notify_event_content =
3197 CallNotifyEventContent::new(call_id, application, notify_type, mentions);
3198 self.send(call_notify_event_content).await?;
3199 Ok(())
3200 }
3201
3202 pub async fn save_composer_draft(&self, draft: ComposerDraft) -> Result<()> {
3205 self.client
3206 .store()
3207 .set_kv_data(
3208 StateStoreDataKey::ComposerDraft(self.room_id()),
3209 StateStoreDataValue::ComposerDraft(draft),
3210 )
3211 .await?;
3212 Ok(())
3213 }
3214
3215 pub async fn load_composer_draft(&self) -> Result<Option<ComposerDraft>> {
3217 let data = self
3218 .client
3219 .store()
3220 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3221 .await?;
3222 Ok(data.and_then(|d| d.into_composer_draft()))
3223 }
3224
3225 pub async fn clear_composer_draft(&self) -> Result<()> {
3227 self.client
3228 .store()
3229 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3230 .await?;
3231 Ok(())
3232 }
3233
3234 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3237 let response = self
3238 .client
3239 .send(get_state_events_for_key::v3::Request::new(
3240 self.room_id().to_owned(),
3241 StateEventType::RoomPinnedEvents,
3242 "".to_owned(),
3243 ))
3244 .await;
3245
3246 match response {
3247 Ok(response) => {
3248 Ok(Some(response.content.deserialize_as::<RoomPinnedEventsEventContent>()?.pinned))
3249 }
3250 Err(http_error) => match http_error.as_client_api_error() {
3251 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3252 _ => Err(http_error.into()),
3253 },
3254 }
3255 }
3256
3257 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3265 ObservableLiveLocation::new(&self.client, self.room_id())
3266 }
3267
3268 pub async fn subscribe_to_knock_requests(
3282 &self,
3283 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3284 let this = Arc::new(self.clone());
3285
3286 let room_member_events_observer =
3287 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3288
3289 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3290 let mut seen_request_ids_stream = self
3291 .seen_knock_request_ids_map
3292 .subscribe()
3293 .await
3294 .map(|values| values.unwrap_or_default());
3295
3296 let mut room_info_stream = self.subscribe_info();
3297
3298 let clear_seen_ids_handle = spawn({
3301 let this = self.clone();
3302 async move {
3303 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3304 while member_updates_stream.recv().await.is_ok() {
3305 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3307 warn!("Failed to remove seen knock requests: {err}")
3308 }
3309 }
3310 }
3311 });
3312
3313 let combined_stream = stream! {
3314 match this.get_current_join_requests(¤t_seen_ids).await {
3316 Ok(initial_requests) => yield initial_requests,
3317 Err(err) => warn!("Failed to get initial requests to join: {err}")
3318 }
3319
3320 let mut requests_stream = room_member_events_observer.subscribe();
3321 let mut seen_ids = current_seen_ids.clone();
3322
3323 loop {
3324 tokio::select! {
3327 Some((event, _)) = requests_stream.next() => {
3328 if let Some(event) = event.as_original() {
3329 let emit = if event.prev_content().is_some() {
3331 matches!(event.membership_change(),
3332 MembershipChange::Banned |
3333 MembershipChange::Knocked |
3334 MembershipChange::KnockAccepted |
3335 MembershipChange::KnockDenied |
3336 MembershipChange::KnockRetracted
3337 )
3338 } else {
3339 true
3342 };
3343
3344 if emit {
3345 match this.get_current_join_requests(&seen_ids).await {
3346 Ok(requests) => yield requests,
3347 Err(err) => {
3348 warn!("Failed to get updated knock requests on new member event: {err}")
3349 }
3350 }
3351 }
3352 }
3353 }
3354
3355 Some(new_seen_ids) = seen_request_ids_stream.next() => {
3356 seen_ids = new_seen_ids;
3358
3359 match this.get_current_join_requests(&seen_ids).await {
3362 Ok(requests) => yield requests,
3363 Err(err) => {
3364 warn!("Failed to get updated knock requests on seen ids changed: {err}")
3365 }
3366 }
3367 }
3368
3369 Some(room_info) = room_info_stream.next() => {
3370 if !room_info.are_members_synced() {
3373 match this.get_current_join_requests(&seen_ids).await {
3374 Ok(requests) => yield requests,
3375 Err(err) => {
3376 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
3377 }
3378 }
3379 }
3380 }
3381 else => break,
3383 }
3384 }
3385 };
3386
3387 Ok((combined_stream, clear_seen_ids_handle))
3388 }
3389
3390 async fn get_current_join_requests(
3391 &self,
3392 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
3393 ) -> Result<Vec<KnockRequest>> {
3394 Ok(self
3395 .members(RoomMemberships::KNOCK)
3396 .await?
3397 .into_iter()
3398 .filter_map(|member| {
3399 let event_id = member.event().event_id()?;
3400 Some(KnockRequest::new(
3401 self,
3402 event_id,
3403 member.event().timestamp(),
3404 KnockRequestMemberInfo::from_member(&member),
3405 seen_request_ids.contains_key(event_id),
3406 ))
3407 })
3408 .collect())
3409 }
3410
3411 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
3413 RoomPrivacySettings::new(&self.inner, &self.client)
3414 }
3415}
3416
3417#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
3418impl RoomIdentityProvider for Room {
3419 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
3420 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
3421 }
3422
3423 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
3424 Box::pin(async {
3425 let members = self
3426 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
3427 .await
3428 .unwrap_or_else(|_| Default::default());
3429
3430 let mut ret: Vec<UserIdentity> = Vec::new();
3431 for member in members {
3432 if let Some(i) = self.user_identity(member.user_id()).await {
3433 ret.push(i);
3434 }
3435 }
3436 ret
3437 })
3438 }
3439
3440 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
3441 Box::pin(async {
3442 self.client
3443 .encryption()
3444 .get_user_identity(user_id)
3445 .await
3446 .unwrap_or(None)
3447 .map(|u| u.underlying_identity())
3448 })
3449 }
3450}
3451
3452#[derive(Clone)]
3455pub(crate) struct WeakRoom {
3456 client: WeakClient,
3457 room_id: OwnedRoomId,
3458}
3459
3460impl WeakRoom {
3461 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
3463 Self { client, room_id }
3464 }
3465
3466 pub fn get(&self) -> Option<Room> {
3468 self.client.get().and_then(|client| client.get_room(&self.room_id))
3469 }
3470
3471 pub fn room_id(&self) -> &RoomId {
3473 &self.room_id
3474 }
3475}
3476
3477#[derive(Debug, Clone)]
3479pub struct Invite {
3480 pub invitee: RoomMember,
3482 pub inviter: Option<RoomMember>,
3484}
3485
3486#[derive(Error, Debug)]
3487enum InvitationError {
3488 #[error("No membership event found")]
3489 EventMissing,
3490}
3491
3492#[derive(Debug, Clone, Default)]
3494#[non_exhaustive]
3495pub struct Receipts {
3496 pub fully_read: Option<OwnedEventId>,
3498 pub public_read_receipt: Option<OwnedEventId>,
3500 pub private_read_receipt: Option<OwnedEventId>,
3502}
3503
3504impl Receipts {
3505 pub fn new() -> Self {
3507 Self::default()
3508 }
3509
3510 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3519 self.fully_read = event_id.into();
3520 self
3521 }
3522
3523 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3529 self.public_read_receipt = event_id.into();
3530 self
3531 }
3532
3533 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3537 self.private_read_receipt = event_id.into();
3538 self
3539 }
3540
3541 pub fn is_empty(&self) -> bool {
3543 self.fully_read.is_none()
3544 && self.public_read_receipt.is_none()
3545 && self.private_read_receipt.is_none()
3546 }
3547}
3548
3549#[derive(Debug)]
3552pub enum ParentSpace {
3553 Reciprocal(Room),
3556 WithPowerlevel(Room),
3561 Illegitimate(Room),
3564 Unverifiable(OwnedRoomId),
3567}
3568
3569#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
3573pub struct ReportedContentScore(i8);
3574
3575impl ReportedContentScore {
3576 pub const MIN: Self = Self(-100);
3580
3581 pub const MAX: Self = Self(0);
3585
3586 pub fn new(value: i8) -> Option<Self> {
3595 value.try_into().ok()
3596 }
3597
3598 pub fn new_saturating(value: i8) -> Self {
3604 if value > Self::MAX {
3605 Self::MAX
3606 } else if value < Self::MIN {
3607 Self::MIN
3608 } else {
3609 Self(value)
3610 }
3611 }
3612
3613 pub fn value(&self) -> i8 {
3615 self.0
3616 }
3617}
3618
3619impl PartialEq<i8> for ReportedContentScore {
3620 fn eq(&self, other: &i8) -> bool {
3621 self.0.eq(other)
3622 }
3623}
3624
3625impl PartialEq<ReportedContentScore> for i8 {
3626 fn eq(&self, other: &ReportedContentScore) -> bool {
3627 self.eq(&other.0)
3628 }
3629}
3630
3631impl PartialOrd<i8> for ReportedContentScore {
3632 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
3633 self.0.partial_cmp(other)
3634 }
3635}
3636
3637impl PartialOrd<ReportedContentScore> for i8 {
3638 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
3639 self.partial_cmp(&other.0)
3640 }
3641}
3642
3643impl From<ReportedContentScore> for Int {
3644 fn from(value: ReportedContentScore) -> Self {
3645 value.0.into()
3646 }
3647}
3648
3649impl TryFrom<i8> for ReportedContentScore {
3650 type Error = TryFromReportedContentScoreError;
3651
3652 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
3653 if value > Self::MAX || value < Self::MIN {
3654 Err(TryFromReportedContentScoreError(()))
3655 } else {
3656 Ok(Self(value))
3657 }
3658 }
3659}
3660
3661impl TryFrom<i16> for ReportedContentScore {
3662 type Error = TryFromReportedContentScoreError;
3663
3664 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
3665 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3666 value.try_into()
3667 }
3668}
3669
3670impl TryFrom<i32> for ReportedContentScore {
3671 type Error = TryFromReportedContentScoreError;
3672
3673 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
3674 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3675 value.try_into()
3676 }
3677}
3678
3679impl TryFrom<i64> for ReportedContentScore {
3680 type Error = TryFromReportedContentScoreError;
3681
3682 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
3683 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3684 value.try_into()
3685 }
3686}
3687
3688impl TryFrom<Int> for ReportedContentScore {
3689 type Error = TryFromReportedContentScoreError;
3690
3691 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
3692 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3693 value.try_into()
3694 }
3695}
3696
3697#[derive(Debug, Clone, Error)]
3700#[error("out of range conversion attempted")]
3701pub struct TryFromReportedContentScoreError(());
3702
3703#[cfg(all(test, not(target_arch = "wasm32")))]
3704mod tests {
3705 use assert_matches2::assert_matches;
3706 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft, SessionMeta};
3707 use matrix_sdk_test::{
3708 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
3709 SyncResponseBuilder,
3710 };
3711 use ruma::{device_id, event_id, events::room::member::MembershipState, int, room_id, user_id};
3712 use wiremock::{
3713 matchers::{header, method, path_regex},
3714 Mock, MockServer, ResponseTemplate,
3715 };
3716
3717 use super::ReportedContentScore;
3718 use crate::{
3719 authentication::matrix::{MatrixSession, MatrixSessionTokens},
3720 config::RequestConfig,
3721 test_utils::{logged_in_client, mocks::MatrixMockServer},
3722 Client,
3723 };
3724
3725 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
3726 #[async_test]
3727 async fn test_cache_invalidation_while_encrypt() {
3728 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
3729
3730 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
3731 let session = MatrixSession {
3732 meta: SessionMeta {
3733 user_id: user_id!("@example:localhost").to_owned(),
3734 device_id: device_id!("DEVICEID").to_owned(),
3735 },
3736 tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
3737 };
3738
3739 let client = Client::builder()
3740 .homeserver_url("http://localhost:1234")
3741 .request_config(RequestConfig::new().disable_retry())
3742 .sqlite_store(&sqlite_path, None)
3743 .build()
3744 .await
3745 .unwrap();
3746 client.matrix_auth().restore_session(session.clone()).await.unwrap();
3747
3748 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
3749
3750 let server = MockServer::start().await;
3752 {
3753 Mock::given(method("GET"))
3754 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
3755 .and(header("authorization", "Bearer 1234"))
3756 .respond_with(
3757 ResponseTemplate::new(200)
3758 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
3759 )
3760 .mount(&server)
3761 .await;
3762 let response = SyncResponseBuilder::default()
3763 .add_joined_room(
3764 JoinedRoomBuilder::default()
3765 .add_state_event(StateTestEvent::Member)
3766 .add_state_event(StateTestEvent::PowerLevels)
3767 .add_state_event(StateTestEvent::Encryption),
3768 )
3769 .build_sync_response();
3770 client.base_client().receive_sync_response(response).await.unwrap();
3771 }
3772
3773 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
3774
3775 room.preshare_room_key().await.unwrap();
3777
3778 {
3781 let client = Client::builder()
3782 .homeserver_url("http://localhost:1234")
3783 .request_config(RequestConfig::new().disable_retry())
3784 .sqlite_store(&sqlite_path, None)
3785 .build()
3786 .await
3787 .unwrap();
3788 client.matrix_auth().restore_session(session.clone()).await.unwrap();
3789 client
3790 .encryption()
3791 .enable_cross_process_store_lock("client2".to_owned())
3792 .await
3793 .unwrap();
3794
3795 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3796 assert!(guard.is_some());
3797 }
3798
3799 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3801 assert!(guard.is_some());
3802
3803 let olm = client.olm_machine().await;
3805 let olm = olm.as_ref().expect("Olm machine wasn't started");
3806
3807 let _encrypted_content = olm
3810 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
3811 .await
3812 .unwrap();
3813 }
3814
3815 #[test]
3816 fn reported_content_score() {
3817 let score = ReportedContentScore::new(0).unwrap();
3819 assert_eq!(score.value(), 0);
3820 let score = ReportedContentScore::new(-50).unwrap();
3821 assert_eq!(score.value(), -50);
3822 let score = ReportedContentScore::new(-100).unwrap();
3823 assert_eq!(score.value(), -100);
3824 assert_eq!(ReportedContentScore::new(10), None);
3825 assert_eq!(ReportedContentScore::new(-110), None);
3826
3827 let score = ReportedContentScore::new_saturating(0);
3828 assert_eq!(score.value(), 0);
3829 let score = ReportedContentScore::new_saturating(-50);
3830 assert_eq!(score.value(), -50);
3831 let score = ReportedContentScore::new_saturating(-100);
3832 assert_eq!(score.value(), -100);
3833 let score = ReportedContentScore::new_saturating(10);
3834 assert_eq!(score, ReportedContentScore::MAX);
3835 let score = ReportedContentScore::new_saturating(-110);
3836 assert_eq!(score, ReportedContentScore::MIN);
3837
3838 let score = ReportedContentScore::try_from(0i16).unwrap();
3840 assert_eq!(score.value(), 0);
3841 let score = ReportedContentScore::try_from(-100i16).unwrap();
3842 assert_eq!(score.value(), -100);
3843 ReportedContentScore::try_from(10i16).unwrap_err();
3844 ReportedContentScore::try_from(-110i16).unwrap_err();
3845
3846 let score = ReportedContentScore::try_from(0i32).unwrap();
3848 assert_eq!(score.value(), 0);
3849 let score = ReportedContentScore::try_from(-100i32).unwrap();
3850 assert_eq!(score.value(), -100);
3851 ReportedContentScore::try_from(10i32).unwrap_err();
3852 ReportedContentScore::try_from(-110i32).unwrap_err();
3853
3854 let score = ReportedContentScore::try_from(0i64).unwrap();
3856 assert_eq!(score.value(), 0);
3857 let score = ReportedContentScore::try_from(-100i64).unwrap();
3858 assert_eq!(score.value(), -100);
3859 ReportedContentScore::try_from(10i64).unwrap_err();
3860 ReportedContentScore::try_from(-110i64).unwrap_err();
3861
3862 let score = ReportedContentScore::try_from(int!(0)).unwrap();
3864 assert_eq!(score.value(), 0);
3865 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
3866 assert_eq!(score.value(), -100);
3867 ReportedContentScore::try_from(int!(10)).unwrap_err();
3868 ReportedContentScore::try_from(int!(-110)).unwrap_err();
3869 }
3870
3871 #[async_test]
3872 async fn test_composer_draft() {
3873 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
3874
3875 let client = logged_in_client(None).await;
3876
3877 let response = SyncResponseBuilder::default()
3878 .add_joined_room(JoinedRoomBuilder::default())
3879 .build_sync_response();
3880 client.base_client().receive_sync_response(response).await.unwrap();
3881 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
3882
3883 assert_eq!(room.load_composer_draft().await.unwrap(), None);
3884
3885 let draft = ComposerDraft {
3886 plain_text: "Hello, world!".to_owned(),
3887 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
3888 draft_type: ComposerDraftType::NewMessage,
3889 };
3890 room.save_composer_draft(draft.clone()).await.unwrap();
3891 assert_eq!(room.load_composer_draft().await.unwrap(), Some(draft));
3892
3893 room.clear_composer_draft().await.unwrap();
3894 assert_eq!(room.load_composer_draft().await.unwrap(), None);
3895 }
3896
3897 #[async_test]
3898 async fn test_mark_join_requests_as_seen() {
3899 let server = MatrixMockServer::new().await;
3900 let client = server.client_builder().build().await;
3901 let event_id = event_id!("$a:b.c");
3902 let room_id = room_id!("!a:b.c");
3903 let user_id = user_id!("@alice:b.c");
3904
3905 let f = EventFactory::new().room(room_id);
3906 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
3907 .member(user_id)
3908 .membership(MembershipState::Knock)
3909 .event_id(event_id)
3910 .into_raw_timeline()
3911 .cast()]);
3912 let room = server.sync_room(&client, joined_room_builder).await;
3913
3914 let seen_ids =
3916 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
3917 assert!(seen_ids.is_empty());
3918
3919 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
3921 .await
3922 .expect("Couldn't mark join request as seen");
3923
3924 let seen_ids =
3926 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
3927 assert_eq!(seen_ids.len(), 1);
3928 assert_eq!(
3929 seen_ids.into_iter().next().expect("No next value"),
3930 (event_id.to_owned(), user_id.to_owned())
3931 )
3932 }
3933
3934 #[async_test]
3935 async fn test_own_room_membership_with_no_own_member_event() {
3936 let server = MatrixMockServer::new().await;
3937 let client = server.client_builder().build().await;
3938 let room_id = room_id!("!a:b.c");
3939
3940 let room = server.sync_joined_room(&client, room_id).await;
3941
3942 let error = room.own_membership_details().await.err();
3945 assert!(error.is_some());
3946 }
3947
3948 #[async_test]
3949 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
3950 let server = MatrixMockServer::new().await;
3951 let client = server.client_builder().build().await;
3952 let room_id = room_id!("!a:b.c");
3953 let user_id = user_id!("@example:localhost");
3954
3955 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
3956 let joined_room_builder = JoinedRoomBuilder::new(room_id)
3957 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
3958 let room = server.sync_room(&client, joined_room_builder).await;
3959
3960 let ret = room.own_membership_details().await;
3962 assert_matches!(ret, Ok((member, sender)));
3963
3964 assert_eq!(member.event().user_id(), user_id);
3966
3967 assert!(sender.is_none());
3969 }
3970
3971 #[async_test]
3972 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
3973 let server = MatrixMockServer::new().await;
3974 let client = server.client_builder().build().await;
3975 let room_id = room_id!("!a:b.c");
3976 let user_id = user_id!("@example:localhost");
3977
3978 let f = EventFactory::new().room(room_id).sender(user_id);
3979 let joined_room_builder = JoinedRoomBuilder::new(room_id)
3980 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
3981 let room = server.sync_room(&client, joined_room_builder).await;
3982
3983 let ret = room.own_membership_details().await;
3985 assert_matches!(ret, Ok((member, sender)));
3986
3987 assert_eq!(member.event().user_id(), user_id);
3989
3990 assert!(sender.is_some());
3992 assert_eq!(sender.unwrap().event().user_id(), user_id);
3993 }
3994
3995 #[async_test]
3996 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
3997 let server = MatrixMockServer::new().await;
3998 let client = server.client_builder().build().await;
3999 let room_id = room_id!("!a:b.c");
4000 let user_id = user_id!("@example:localhost");
4001 let sender_id = user_id!("@alice:b.c");
4002
4003 let f = EventFactory::new().room(room_id).sender(sender_id);
4004 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4005 f.member(user_id).into_raw_sync().cast(),
4006 f.member(sender_id).into_raw_sync().cast(),
4008 ]);
4009 let room = server.sync_room(&client, joined_room_builder).await;
4010
4011 let ret = room.own_membership_details().await;
4013 assert_matches!(ret, Ok((member, sender)));
4014
4015 assert_eq!(member.event().user_id(), user_id);
4017
4018 assert!(sender.is_some());
4020 assert_eq!(sender.unwrap().event().user_id(), sender_id);
4021 }
4022
4023 #[async_test]
4024 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4025 let server = MatrixMockServer::new().await;
4026 let client = server.client_builder().build().await;
4027 let room_id = room_id!("!a:b.c");
4028 let user_id = user_id!("@example:localhost");
4029 let sender_id = user_id!("@alice:b.c");
4030
4031 let f = EventFactory::new().room(room_id).sender(sender_id);
4032 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4033 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4034 let room = server.sync_room(&client, joined_room_builder).await;
4035
4036 server
4038 .mock_get_members()
4039 .ok(vec![f.member(sender_id).into_raw_timeline().cast()])
4040 .mock_once()
4041 .mount()
4042 .await;
4043
4044 let ret = room.own_membership_details().await;
4046 assert_matches!(ret, Ok((member, sender)));
4047
4048 assert_eq!(member.event().user_id(), user_id);
4050
4051 assert!(sender.is_some());
4053 assert_eq!(sender.unwrap().event().user_id(), sender_id);
4054 }
4055}