matrix_sdk_ui/timeline/builder.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{pin_mut, StreamExt};
18use matrix_sdk::{
19 encryption::backups::BackupState,
20 event_cache::{EventsOrigin, RoomEventCacheUpdate},
21 executor::spawn,
22 Room,
23};
24use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
25use tokio::sync::broadcast::error::RecvError;
26use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
27use tracing::{info, info_span, trace, warn, Instrument, Span};
28
29use super::{
30 controller::{TimelineController, TimelineSettings},
31 to_device::{handle_forwarded_room_key_event, handle_room_key_event},
32 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
33};
34use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
35
36/// Builder that allows creating and configuring various parts of a
37/// [`Timeline`].
38#[must_use]
39#[derive(Debug)]
40pub struct TimelineBuilder {
41 room: Room,
42 settings: TimelineSettings,
43 focus: TimelineFocus,
44
45 /// An optional hook to call whenever we run into an unable-to-decrypt or a
46 /// late-decryption event.
47 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
48
49 /// An optional prefix for internal IDs.
50 internal_id_prefix: Option<String>,
51}
52
53impl TimelineBuilder {
54 pub(super) fn new(room: &Room) -> Self {
55 Self {
56 room: room.clone(),
57 settings: TimelineSettings::default(),
58 unable_to_decrypt_hook: None,
59 focus: TimelineFocus::Live,
60 internal_id_prefix: None,
61 }
62 }
63
64 /// Sets up the initial focus for this timeline.
65 ///
66 /// This can be changed later on while the timeline is alive.
67 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
68 self.focus = focus;
69 self
70 }
71
72 /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
73 /// we're building.
74 ///
75 /// If it was previously set before, will overwrite the previous one.
76 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
77 self.unable_to_decrypt_hook = Some(hook);
78 self
79 }
80
81 /// Sets the internal id prefix for this timeline.
82 ///
83 /// The prefix will be prepended to any internal ID using when generating
84 /// timeline IDs for this timeline.
85 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
86 self.internal_id_prefix = Some(prefix);
87 self
88 }
89
90 /// Chose when to insert the date separators, either in between each day
91 /// or each month.
92 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
93 self.settings.date_divider_mode = mode;
94 self
95 }
96
97 /// Enable tracking of the fully-read marker and the read receipts on the
98 /// timeline.
99 pub fn track_read_marker_and_receipts(mut self) -> Self {
100 self.settings.track_read_receipts = true;
101 self
102 }
103
104 /// Use the given filter to choose whether to add events to the timeline.
105 ///
106 /// # Arguments
107 ///
108 /// * `filter` - A function that takes a deserialized event, and should
109 /// return `true` if the event should be added to the `Timeline`.
110 ///
111 /// If this is not overridden, the timeline uses the default filter that
112 /// only allows events that are materialized into a `Timeline` item. For
113 /// instance, reactions and edits don't get their own timeline item (as
114 /// they affect another existing one), so they're "filtered out" to
115 /// reflect that.
116 ///
117 /// You can use the default event filter with
118 /// [`crate::timeline::default_event_filter`] so as to chain it with
119 /// your own event filter, if you want to avoid situations where a read
120 /// receipt would be attached to an event that doesn't get its own
121 /// timeline item.
122 ///
123 /// Note that currently:
124 ///
125 /// - Not all event types have a representation as a `TimelineItem` so these
126 /// are not added no matter what the filter returns.
127 /// - It is not possible to filter out `m.room.encrypted` events (otherwise
128 /// they couldn't be decrypted when the appropriate room key arrives).
129 pub fn event_filter<F>(mut self, filter: F) -> Self
130 where
131 F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
132 {
133 self.settings.event_filter = Arc::new(filter);
134 self
135 }
136
137 /// Whether to add events that failed to deserialize to the timeline.
138 ///
139 /// Defaults to `true`.
140 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
141 self.settings.add_failed_to_parse = add;
142 self
143 }
144
145 /// Create a [`Timeline`] with the options set on this builder.
146 #[tracing::instrument(
147 skip(self),
148 fields(
149 room_id = ?self.room.room_id(),
150 track_read_receipts = self.settings.track_read_receipts,
151 )
152 )]
153 pub async fn build(self) -> Result<Timeline, Error> {
154 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
155
156 let client = room.client();
157 let event_cache = client.event_cache();
158
159 // Subscribe the event cache to sync responses, in case we hadn't done it yet.
160 event_cache.subscribe()?;
161
162 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
163 let (_, mut event_subscriber) = room_event_cache.subscribe().await?;
164
165 let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
166 let is_room_encrypted = room.is_encrypted().await.ok();
167
168 let controller = TimelineController::new(
169 room,
170 focus.clone(),
171 internal_id_prefix.clone(),
172 unable_to_decrypt_hook,
173 is_room_encrypted,
174 )
175 .with_settings(settings);
176
177 let has_events = controller.init_focus(&room_event_cache).await?;
178
179 let room = controller.room();
180 let client = room.client();
181
182 let pinned_events_join_handle = if is_pinned_events {
183 let mut pinned_event_ids_stream = room.pinned_event_ids_stream();
184 Some(spawn({
185 let inner = controller.clone();
186 async move {
187 while pinned_event_ids_stream.next().await.is_some() {
188 if let Ok(events) = inner.reload_pinned_events().await {
189 inner
190 .replace_with_initial_remote_events(
191 events.into_iter(),
192 RemoteEventOrigin::Pagination,
193 )
194 .await;
195 }
196 }
197 }
198 }))
199 } else {
200 None
201 };
202
203 let encryption_changes_handle = spawn({
204 let inner = controller.clone();
205 async move {
206 inner.handle_encryption_state_changes().await;
207 }
208 });
209
210 let room_update_join_handle = spawn({
211 let room_event_cache = room_event_cache.clone();
212 let inner = controller.clone();
213
214 let span = info_span!(
215 parent: Span::none(),
216 "live_update_handler",
217 room_id = ?room.room_id(),
218 focus = focus.debug_string(),
219 prefix = internal_id_prefix
220 );
221 span.follows_from(Span::current());
222
223 async move {
224 trace!("Spawned the event subscriber task.");
225
226 loop {
227 trace!("Waiting for an event.");
228
229 let update = match event_subscriber.recv().await {
230 Ok(up) => up,
231 Err(RecvError::Closed) => break,
232 Err(RecvError::Lagged(num_skipped)) => {
233 warn!(
234 num_skipped,
235 "Lagged behind event cache updates, resetting timeline"
236 );
237
238 // The updates might have lagged, but the room event cache might have
239 // events, so retrieve them and add them back again to the timeline,
240 // after clearing it.
241 //
242 // If we can't get a handle on the room cache's events, just clear the
243 // current timeline.
244 match room_event_cache.subscribe().await {
245 Ok((events, _)) => {
246 inner.replace_with_initial_remote_events(events.into_iter(), RemoteEventOrigin::Sync).await;
247 }
248 Err(err) => {
249 warn!("Error when re-inserting initial events into the timeline: {err}");
250 inner.clear().await;
251 }
252 }
253
254 continue;
255 }
256 };
257
258 match update {
259 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
260 trace!(target = %event_id, "Handling fully read marker.");
261 inner.handle_fully_read_marker(event_id).await;
262 }
263
264 RoomEventCacheUpdate::Clear => {
265 if !inner.is_live().await {
266 // Ignore a clear for a timeline not in the live mode; the
267 // focused-on-event mode doesn't add any new items to the timeline
268 // anyways.
269 continue;
270 }
271
272 trace!("Clearing the timeline.");
273 inner.clear().await;
274 }
275
276 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
277 trace!("Received new timeline events diffs");
278
279 inner.handle_remote_events_with_diffs(
280 diffs,
281 match origin {
282 EventsOrigin::Sync => RemoteEventOrigin::Sync,
283 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
284 }
285 ).await;
286 }
287
288 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
289 trace!("Received new ephemeral events from sync.");
290
291 // TODO: (bnjbvr) ephemeral should be handled by the event cache.
292 inner.handle_ephemeral_events(events).await;
293 }
294
295 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
296 if !ambiguity_changes.is_empty() {
297 let member_ambiguity_changes = ambiguity_changes
298 .values()
299 .flat_map(|change| change.user_ids())
300 .collect::<BTreeSet<_>>();
301 inner.force_update_sender_profiles(&member_ambiguity_changes).await;
302 }
303 }
304 }
305 }
306 }
307 .instrument(span)
308 });
309
310 let local_echo_listener_handle = {
311 let timeline = controller.clone();
312 let (local_echoes, mut listener) = room.send_queue().subscribe().await?;
313
314 spawn({
315 // Handles existing local echoes first.
316 for echo in local_echoes {
317 timeline.handle_local_echo(echo).await;
318 }
319
320 let span = info_span!(
321 parent: Span::none(),
322 "local_echo_handler",
323 room_id = ?room.room_id(),
324 focus = focus.debug_string(),
325 prefix = internal_id_prefix
326 );
327 span.follows_from(Span::current());
328
329 // React to future local echoes too.
330 async move {
331 info!("spawned the local echo handler!");
332
333 loop {
334 match listener.recv().await {
335 Ok(update) => timeline.handle_room_send_queue_update(update).await,
336
337 Err(RecvError::Lagged(num_missed)) => {
338 warn!("missed {num_missed} local echoes, ignoring those missed");
339 }
340
341 Err(RecvError::Closed) => {
342 info!("channel closed, exiting the local echo handler");
343 break;
344 }
345 }
346 }
347 }
348 .instrument(span)
349 })
350 };
351
352 // Not using room.add_event_handler here because RoomKey events are
353 // to-device events that are not received in the context of a room.
354
355 let room_key_handle = client.add_event_handler(handle_room_key_event(
356 controller.clone(),
357 room.room_id().to_owned(),
358 ));
359
360 let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
361 controller.clone(),
362 room.room_id().to_owned(),
363 ));
364
365 let handles = vec![room_key_handle, forwarded_room_key_handle];
366
367 let room_key_from_backups_join_handle = {
368 let inner = controller.clone();
369 let room_id = inner.room().room_id();
370
371 let stream = client.encryption().backups().room_keys_for_room_stream(room_id);
372
373 spawn(async move {
374 pin_mut!(stream);
375
376 while let Some(update) = stream.next().await {
377 let room = inner.room();
378
379 match update {
380 Ok(info) => {
381 let mut session_ids = BTreeSet::new();
382
383 for set in info.into_values() {
384 session_ids.extend(set);
385 }
386
387 inner.retry_event_decryption(room, Some(session_ids)).await;
388 }
389 // We lagged, so retry every event.
390 Err(_) => inner.retry_event_decryption(room, None).await,
391 }
392 }
393 })
394 };
395
396 let room_key_backup_enabled_join_handle = {
397 let inner = controller.clone();
398 let stream = client.encryption().backups().state_stream();
399
400 spawn(async move {
401 pin_mut!(stream);
402
403 while let Some(update) = stream.next().await {
404 match update {
405 // If the backup got enabled, or we lagged and thus missed that the backup
406 // might be enabled, retry to decrypt all the events. Please note, depending
407 // on the backup download strategy, this might do two things under the
408 // assumption that the backup contains the relevant room keys:
409 //
410 // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set
411 // to `OneShot`.
412 // 2. It will fail to decrypt the event, but try to download the room key to
413 // decrypt it if the `BackupDownloadStrategy` has been set to
414 // `AfterDecryptionFailure`.
415 Ok(BackupState::Enabled) | Err(_) => {
416 let room = inner.room();
417 inner.retry_event_decryption(room, None).await;
418 }
419 // The other states aren't interesting since they are either still enabling
420 // the backup or have the backup in the disabled state.
421 Ok(
422 BackupState::Unknown
423 | BackupState::Creating
424 | BackupState::Resuming
425 | BackupState::Disabling
426 | BackupState::Downloading
427 | BackupState::Enabling,
428 ) => (),
429 }
430 }
431 })
432 };
433
434 // TODO: Technically, this should be the only stream we need to listen to get
435 // notified when we should retry to decrypt an event. We sadly can't do that,
436 // since the cross-process support kills the `OlmMachine` which then in
437 // turn kills this stream. Once this is solved remove all the other ways we
438 // listen for room keys.
439 let room_keys_received_join_handle = {
440 let inner = controller.clone();
441 let stream = client.encryption().room_keys_received_stream().await.expect(
442 "We should be logged in by now, so we should have access to an OlmMachine \
443 to be able to listen to this stream",
444 );
445
446 spawn(async move {
447 pin_mut!(stream);
448
449 while let Some(room_keys) = stream.next().await {
450 let session_ids = match room_keys {
451 Ok(room_keys) => {
452 let session_ids: BTreeSet<String> = room_keys
453 .into_iter()
454 .filter(|info| info.room_id == inner.room().room_id())
455 .map(|info| info.session_id)
456 .collect();
457
458 Some(session_ids)
459 }
460 Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
461 // We lagged, let's retry to decrypt anything we have, maybe something
462 // was received.
463 warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");
464
465 None
466 }
467 };
468
469 let room = inner.room();
470 inner.retry_event_decryption(room, session_ids).await;
471 }
472 })
473 };
474
475 let timeline = Timeline {
476 controller,
477 event_cache: room_event_cache,
478 drop_handle: Arc::new(TimelineDropHandle {
479 client,
480 event_handler_handles: handles,
481 room_update_join_handle,
482 pinned_events_join_handle,
483 room_key_from_backups_join_handle,
484 room_key_backup_enabled_join_handle,
485 room_keys_received_join_handle,
486 local_echo_listener_handle,
487 _event_cache_drop_handle: event_cache_drop,
488 encryption_changes_handle,
489 }),
490 };
491
492 if has_events {
493 // The events we're injecting might be encrypted events, but we might
494 // have received the room key to decrypt them while nobody was listening to the
495 // `m.room_key` event, let's retry now.
496 timeline.retry_decryption_for_all_events().await;
497 }
498
499 Ok(timeline)
500 }
501}