1use std::{future::Future, sync::Arc};
16
17use eyeball::Subscriber;
18use indexmap::IndexMap;
19#[cfg(test)]
20use matrix_sdk::crypto::{DecryptionSettings, RoomEventDecryptionResult, TrustRequirement};
21use matrix_sdk::{
22 crypto::types::events::CryptoContextInfo,
23 deserialized_responses::{EncryptionInfo, TimelineEvent},
24 event_cache::paginator::PaginableRoom,
25 room::{PushContext, Relations, RelationsOptions},
26 AsyncTraitDeps, Result, Room, SendOutsideWasm,
27};
28use matrix_sdk_base::{latest_event::LatestEvent, RoomInfo};
29use ruma::{
30 events::{
31 fully_read::FullyReadEventContent,
32 receipt::{Receipt, ReceiptThread, ReceiptType},
33 AnyMessageLikeEventContent, AnySyncTimelineEvent,
34 },
35 serde::Raw,
36 EventId, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId,
37};
38use tracing::error;
39
40use super::{EventTimelineItem, Profile, RedactError, TimelineBuilder};
41use crate::timeline::{self, pinned_events_loader::PinnedEventsRoom, Timeline};
42
43pub trait RoomExt {
44 fn timeline(&self)
52 -> impl Future<Output = Result<Timeline, timeline::Error>> + SendOutsideWasm;
53
54 fn timeline_builder(&self) -> TimelineBuilder;
63
64 fn latest_event_item(
67 &self,
68 ) -> impl Future<Output = Option<EventTimelineItem>> + SendOutsideWasm;
69}
70
71impl RoomExt for Room {
72 async fn timeline(&self) -> Result<Timeline, timeline::Error> {
73 self.timeline_builder().build().await
74 }
75
76 fn timeline_builder(&self) -> TimelineBuilder {
77 TimelineBuilder::new(self).track_read_marker_and_receipts()
78 }
79
80 async fn latest_event_item(&self) -> Option<EventTimelineItem> {
81 if let Some(latest_event) = (**self).latest_event() {
82 EventTimelineItem::from_latest_event(self.client(), self.room_id(), latest_event).await
83 } else {
84 None
85 }
86 }
87}
88
89pub(super) trait RoomDataProvider:
90 Clone + PaginableRoom + PinnedEventsRoom + 'static
91{
92 fn own_user_id(&self) -> &UserId;
93 fn room_version(&self) -> RoomVersionId;
94
95 fn crypto_context_info(&self)
96 -> impl Future<Output = CryptoContextInfo> + SendOutsideWasm + '_;
97
98 fn profile_from_user_id<'a>(
99 &'a self,
100 user_id: &'a UserId,
101 ) -> impl Future<Output = Option<Profile>> + SendOutsideWasm + 'a;
102 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile>;
103
104 fn load_user_receipt<'a>(
106 &'a self,
107 receipt_type: ReceiptType,
108 thread: ReceiptThread,
109 user_id: &'a UserId,
110 ) -> impl Future<Output = Option<(OwnedEventId, Receipt)>> + SendOutsideWasm + 'a;
111
112 fn load_event_receipts<'a>(
114 &'a self,
115 event_id: &'a EventId,
116 ) -> impl Future<Output = IndexMap<OwnedUserId, Receipt>> + SendOutsideWasm + 'a;
117
118 fn load_fully_read_marker(&self) -> impl Future<Output = Option<OwnedEventId>> + '_;
120
121 fn push_context(&self) -> impl Future<Output = Option<PushContext>> + SendOutsideWasm + '_;
122
123 fn send(
125 &self,
126 content: AnyMessageLikeEventContent,
127 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + '_;
128
129 fn redact<'a>(
131 &'a self,
132 event_id: &'a EventId,
133 reason: Option<&'a str>,
134 transaction_id: Option<OwnedTransactionId>,
135 ) -> impl Future<Output = Result<(), super::Error>> + SendOutsideWasm + 'a;
136
137 fn room_info(&self) -> Subscriber<RoomInfo>;
138
139 fn get_encryption_info(
142 &self,
143 session_id: &str,
144 sender: &UserId,
145 ) -> impl Future<Output = Option<Arc<EncryptionInfo>>> + SendOutsideWasm;
146
147 async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations>;
148
149 fn load_event<'a>(
151 &'a self,
152 event_id: &'a EventId,
153 ) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm + 'a;
154}
155
156impl RoomDataProvider for Room {
157 fn own_user_id(&self) -> &UserId {
158 (**self).own_user_id()
159 }
160
161 fn room_version(&self) -> RoomVersionId {
162 (**self).clone_info().room_version_or_default()
163 }
164
165 async fn crypto_context_info(&self) -> CryptoContextInfo {
166 self.crypto_context_info().await
167 }
168
169 async fn profile_from_user_id<'a>(&'a self, user_id: &'a UserId) -> Option<Profile> {
170 match self.get_member_no_sync(user_id).await {
171 Ok(Some(member)) => Some(Profile {
172 display_name: member.display_name().map(ToOwned::to_owned),
173 display_name_ambiguous: member.name_ambiguous(),
174 avatar_url: member.avatar_url().map(ToOwned::to_owned),
175 }),
176 Ok(None) if self.are_members_synced() => Some(Profile::default()),
177 Ok(None) => None,
178 Err(e) => {
179 error!(%user_id, "Failed to fetch room member information: {e}");
180 None
181 }
182 }
183 }
184
185 fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile> {
186 if !latest_event.has_sender_profile() {
187 return None;
188 }
189
190 Some(Profile {
191 display_name: latest_event.sender_display_name().map(ToOwned::to_owned),
192 display_name_ambiguous: latest_event.sender_name_ambiguous().unwrap_or(false),
193 avatar_url: latest_event.sender_avatar_url().map(ToOwned::to_owned),
194 })
195 }
196
197 async fn load_user_receipt<'a>(
198 &'a self,
199 receipt_type: ReceiptType,
200 thread: ReceiptThread,
201 user_id: &'a UserId,
202 ) -> Option<(OwnedEventId, Receipt)> {
203 match self.load_user_receipt(receipt_type.clone(), thread.clone(), user_id).await {
204 Ok(receipt) => receipt,
205 Err(e) => {
206 error!(
207 ?receipt_type,
208 ?thread,
209 ?user_id,
210 "Failed to get read receipt for user: {e}"
211 );
212 None
213 }
214 }
215 }
216
217 async fn load_event_receipts<'a>(
218 &'a self,
219 event_id: &'a EventId,
220 ) -> IndexMap<OwnedUserId, Receipt> {
221 let mut unthreaded_receipts = match self
222 .load_event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id)
223 .await
224 {
225 Ok(receipts) => receipts.into_iter().collect(),
226 Err(e) => {
227 error!(?event_id, "Failed to get unthreaded read receipts for event: {e}");
228 IndexMap::new()
229 }
230 };
231
232 let main_thread_receipts = match self
233 .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
234 .await
235 {
236 Ok(receipts) => receipts,
237 Err(e) => {
238 error!(?event_id, "Failed to get main thread read receipts for event: {e}");
239 Vec::new()
240 }
241 };
242
243 unthreaded_receipts.extend(main_thread_receipts);
244 unthreaded_receipts
245 }
246
247 async fn push_context(&self) -> Option<PushContext> {
248 self.push_context().await.ok().flatten()
249 }
250
251 async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
252 match self.account_data_static::<FullyReadEventContent>().await {
253 Ok(Some(fully_read)) => match fully_read.deserialize() {
254 Ok(fully_read) => Some(fully_read.content.event_id),
255 Err(e) => {
256 error!("Failed to deserialize fully-read account data: {e}");
257 None
258 }
259 },
260 Err(e) => {
261 error!("Failed to get fully-read account data from the store: {e}");
262 None
263 }
264 _ => None,
265 }
266 }
267
268 async fn send(&self, content: AnyMessageLikeEventContent) -> Result<(), super::Error> {
269 let _ = self.send_queue().send(content).await?;
270 Ok(())
271 }
272
273 async fn redact<'a>(
274 &'a self,
275 event_id: &'a EventId,
276 reason: Option<&'a str>,
277 transaction_id: Option<OwnedTransactionId>,
278 ) -> Result<(), super::Error> {
279 let _ = self
280 .redact(event_id, reason, transaction_id)
281 .await
282 .map_err(RedactError::HttpError)
283 .map_err(super::Error::RedactError)?;
284 Ok(())
285 }
286
287 fn room_info(&self) -> Subscriber<RoomInfo> {
288 self.subscribe_info()
289 }
290
291 async fn get_encryption_info(
292 &self,
293 session_id: &str,
294 sender: &UserId,
295 ) -> Option<Arc<EncryptionInfo>> {
296 self.get_encryption_info(session_id, sender).await
298 }
299
300 async fn relations(&self, event_id: OwnedEventId, opts: RelationsOptions) -> Result<Relations> {
301 self.relations(event_id, opts).await
302 }
303
304 async fn load_event<'a>(&'a self, event_id: &'a EventId) -> Result<TimelineEvent> {
305 self.load_or_fetch_event(event_id, None).await
306 }
307}
308
309pub(super) trait Decryptor: AsyncTraitDeps + Clone + 'static {
312 fn decrypt_event_impl(
313 &self,
314 raw: &Raw<AnySyncTimelineEvent>,
315 push_ctx: Option<&PushContext>,
316 ) -> impl Future<Output = Result<TimelineEvent>> + SendOutsideWasm;
317}
318
319impl Decryptor for Room {
320 async fn decrypt_event_impl(
321 &self,
322 raw: &Raw<AnySyncTimelineEvent>,
323 push_ctx: Option<&PushContext>,
324 ) -> Result<TimelineEvent> {
325 self.decrypt_event(raw.cast_ref(), push_ctx).await
326 }
327}
328
329#[cfg(test)]
330impl Decryptor for (matrix_sdk_base::crypto::OlmMachine, ruma::OwnedRoomId) {
331 async fn decrypt_event_impl(
332 &self,
333 raw: &Raw<AnySyncTimelineEvent>,
334 push_ctx: Option<&PushContext>,
335 ) -> Result<TimelineEvent> {
336 let (olm_machine, room_id) = self;
337 let decryption_settings =
338 DecryptionSettings { sender_device_trust_requirement: TrustRequirement::Untrusted };
339
340 match olm_machine
341 .try_decrypt_room_event(raw.cast_ref(), room_id, &decryption_settings)
342 .await?
343 {
344 RoomEventDecryptionResult::Decrypted(decrypted) => {
345 let push_actions = push_ctx.map(|push_ctx| push_ctx.for_event(&decrypted.event));
346 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
347 }
348 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
349 Ok(TimelineEvent::from_utd(raw.clone(), utd_info))
350 }
351 }
352 }
353}