matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet},
17    sync::Arc,
18};
19
20use futures_core::Stream;
21use futures_util::{pin_mut, StreamExt};
22use matrix_sdk::{
23    crypto::store::RoomKeyInfo,
24    encryption::backups::BackupState,
25    event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
26    executor::spawn,
27    send_queue::RoomSendQueueUpdate,
28    Room,
29};
30use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
31use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId};
32use tokio::sync::broadcast::{error::RecvError, Receiver};
33use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
34use tracing::{info_span, instrument, trace, warn, Instrument, Span};
35
36use super::{
37    controller::{TimelineController, TimelineSettings},
38    to_device::{handle_forwarded_room_key_event, handle_room_key_event},
39    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
40};
41use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
42
43/// Builder that allows creating and configuring various parts of a
44/// [`Timeline`].
45#[must_use]
46#[derive(Debug)]
47pub struct TimelineBuilder {
48    room: Room,
49    settings: TimelineSettings,
50    focus: TimelineFocus,
51
52    /// An optional hook to call whenever we run into an unable-to-decrypt or a
53    /// late-decryption event.
54    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
55
56    /// An optional prefix for internal IDs.
57    internal_id_prefix: Option<String>,
58}
59
60impl TimelineBuilder {
61    pub fn new(room: &Room) -> Self {
62        Self {
63            room: room.clone(),
64            settings: TimelineSettings::default(),
65            unable_to_decrypt_hook: None,
66            focus: TimelineFocus::Live { hide_threaded_events: false },
67            internal_id_prefix: None,
68        }
69    }
70
71    /// Sets up the initial focus for this timeline.
72    ///
73    /// This can be changed later on while the timeline is alive.
74    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
75        self.focus = focus;
76        self
77    }
78
79    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
80    /// we're building.
81    ///
82    /// If it was previously set before, will overwrite the previous one.
83    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
84        self.unable_to_decrypt_hook = Some(hook);
85        self
86    }
87
88    /// Sets the internal id prefix for this timeline.
89    ///
90    /// The prefix will be prepended to any internal ID using when generating
91    /// timeline IDs for this timeline.
92    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
93        self.internal_id_prefix = Some(prefix);
94        self
95    }
96
97    /// Chose when to insert the date separators, either in between each day
98    /// or each month.
99    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
100        self.settings.date_divider_mode = mode;
101        self
102    }
103
104    /// Enable tracking of the fully-read marker and the read receipts on the
105    /// timeline.
106    pub fn track_read_marker_and_receipts(mut self) -> Self {
107        self.settings.track_read_receipts = true;
108        self
109    }
110
111    /// Use the given filter to choose whether to add events to the timeline.
112    ///
113    /// # Arguments
114    ///
115    /// * `filter` - A function that takes a deserialized event, and should
116    ///   return `true` if the event should be added to the `Timeline`.
117    ///
118    /// If this is not overridden, the timeline uses the default filter that
119    /// only allows events that are materialized into a `Timeline` item. For
120    /// instance, reactions and edits don't get their own timeline item (as
121    /// they affect another existing one), so they're "filtered out" to
122    /// reflect that.
123    ///
124    /// You can use the default event filter with
125    /// [`crate::timeline::default_event_filter`] so as to chain it with
126    /// your own event filter, if you want to avoid situations where a read
127    /// receipt would be attached to an event that doesn't get its own
128    /// timeline item.
129    ///
130    /// Note that currently:
131    ///
132    /// - Not all event types have a representation as a `TimelineItem` so these
133    ///   are not added no matter what the filter returns.
134    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
135    ///   they couldn't be decrypted when the appropriate room key arrives).
136    pub fn event_filter<F>(mut self, filter: F) -> Self
137    where
138        F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool
139            + SendOutsideWasm
140            + SyncOutsideWasm
141            + 'static,
142    {
143        self.settings.event_filter = Arc::new(filter);
144        self
145    }
146
147    /// Whether to add events that failed to deserialize to the timeline.
148    ///
149    /// Defaults to `true`.
150    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
151        self.settings.add_failed_to_parse = add;
152        self
153    }
154
155    /// Create a [`Timeline`] with the options set on this builder.
156    #[tracing::instrument(
157        skip(self),
158        fields(
159            room_id = ?self.room.room_id(),
160            track_read_receipts = self.settings.track_read_receipts,
161        )
162    )]
163    pub async fn build(self) -> Result<Timeline, Error> {
164        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
165
166        let client = room.client();
167        let event_cache = client.event_cache();
168
169        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
170        event_cache.subscribe()?;
171
172        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
173        let (_, event_subscriber) = room_event_cache.subscribe().await;
174
175        let is_live = matches!(focus, TimelineFocus::Live { .. });
176        let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
177        let is_room_encrypted = room
178            .latest_encryption_state()
179            .await
180            .map(|state| state.is_encrypted())
181            .ok()
182            .unwrap_or_default();
183
184        let controller = TimelineController::new(
185            room.clone(),
186            focus.clone(),
187            internal_id_prefix.clone(),
188            unable_to_decrypt_hook,
189            is_room_encrypted,
190        )
191        .with_settings(settings);
192
193        let has_events = controller.init_focus(&room_event_cache).await?;
194
195        let pinned_events_join_handle = if is_pinned_events {
196            Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
197        } else {
198            None
199        };
200
201        let encryption_changes_handle = spawn({
202            let inner = controller.clone();
203            async move {
204                inner.handle_encryption_state_changes().await;
205            }
206        });
207
208        let room_update_join_handle = spawn({
209            let span = info_span!(
210                parent: Span::none(),
211                "live_update_handler",
212                room_id = ?room.room_id(),
213                focus = focus.debug_string(),
214                prefix = internal_id_prefix
215            );
216            span.follows_from(Span::current());
217
218            room_event_cache_updates_task(
219                room_event_cache.clone(),
220                controller.clone(),
221                event_subscriber,
222                is_live,
223            )
224            .instrument(span)
225        });
226
227        let local_echo_listener_handle = {
228            let timeline_controller = controller.clone();
229            let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
230
231            spawn({
232                // Handles existing local echoes first.
233                for echo in local_echoes {
234                    timeline_controller.handle_local_echo(echo).await;
235                }
236
237                let span = info_span!(
238                    parent: Span::none(),
239                    "local_echo_handler",
240                    room_id = ?room.room_id(),
241                    focus = focus.debug_string(),
242                    prefix = internal_id_prefix
243                );
244                span.follows_from(Span::current());
245
246                room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
247            })
248        };
249
250        let room_key_handle = client.add_event_handler(handle_room_key_event(
251            controller.clone(),
252            room.room_id().to_owned(),
253        ));
254
255        let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
256            controller.clone(),
257            room.room_id().to_owned(),
258        ));
259
260        let event_handlers = vec![room_key_handle, forwarded_room_key_handle];
261
262        // Not using room.add_event_handler here because RoomKey events are
263        // to-device events that are not received in the context of a room.
264
265        let room_key_from_backups_join_handle = spawn(room_keys_from_backups_task(
266            client.encryption().backups().room_keys_for_room_stream(controller.room().room_id()),
267            controller.clone(),
268        ));
269
270        let room_key_backup_enabled_join_handle = spawn(backup_states_task(
271            client.encryption().backups().state_stream(),
272            controller.clone(),
273        ));
274
275        // TODO: Technically, this should be the only stream we need to listen to get
276        // notified when we should retry to decrypt an event. We sadly can't do that,
277        // since the cross-process support kills the `OlmMachine` which then in
278        // turn kills this stream. Once this is solved remove all the other ways we
279        // listen for room keys.
280        let room_keys_received_join_handle = {
281            spawn(room_key_received_task(
282                client.encryption().room_keys_received_stream().await.expect(
283                    "We should be logged in by now, so we should have access to an `OlmMachine` \
284                     to be able to listen to this stream",
285                ),
286                controller.clone(),
287            ))
288        };
289
290        let timeline = Timeline {
291            controller,
292            event_cache: room_event_cache,
293            drop_handle: Arc::new(TimelineDropHandle {
294                client,
295                event_handler_handles: event_handlers,
296                room_update_join_handle,
297                pinned_events_join_handle,
298                room_key_from_backups_join_handle,
299                room_key_backup_enabled_join_handle,
300                room_keys_received_join_handle,
301                local_echo_listener_handle,
302                _event_cache_drop_handle: event_cache_drop,
303                encryption_changes_handle,
304            }),
305        };
306
307        if has_events {
308            // The events we're injecting might be encrypted events, but we might
309            // have received the room key to decrypt them while nobody was listening to the
310            // `m.room_key` event, let's retry now.
311            timeline.retry_decryption_for_all_events().await;
312        }
313
314        Ok(timeline)
315    }
316}
317
318/// The task that handles the pinned event IDs updates.
319#[instrument(
320    skip_all,
321    fields(
322        room_id = %timeline_controller.room().room_id(),
323    )
324)]
325async fn pinned_events_task<S>(pinned_event_ids_stream: S, timeline_controller: TimelineController)
326where
327    S: Stream<Item = Vec<OwnedEventId>>,
328{
329    pin_mut!(pinned_event_ids_stream);
330
331    while pinned_event_ids_stream.next().await.is_some() {
332        trace!("received a pinned events update");
333
334        match timeline_controller.reload_pinned_events().await {
335            Ok(Some(events)) => {
336                trace!("successfully reloaded pinned events");
337                timeline_controller
338                    .replace_with_initial_remote_events(
339                        events.into_iter(),
340                        RemoteEventOrigin::Pagination,
341                    )
342                    .await;
343            }
344
345            Ok(None) => {
346                // The list of pinned events hasn't changed since the previous
347                // time.
348            }
349
350            Err(err) => {
351                warn!("Failed to reload pinned events: {err}");
352            }
353        }
354    }
355}
356
357/// The task that handles the [`RoomEventCacheUpdate`]s.
358async fn room_event_cache_updates_task(
359    room_event_cache: RoomEventCache,
360    timeline_controller: TimelineController,
361    mut event_subscriber: RoomEventCacheListener,
362    is_live: bool,
363) {
364    trace!("Spawned the event subscriber task.");
365
366    loop {
367        trace!("Waiting for an event.");
368
369        let update = match event_subscriber.recv().await {
370            Ok(up) => up,
371            Err(RecvError::Closed) => break,
372            Err(RecvError::Lagged(num_skipped)) => {
373                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
374
375                // The updates might have lagged, but the room event cache might have
376                // events, so retrieve them and add them back again to the timeline,
377                // after clearing it.
378                let initial_events = room_event_cache.events().await;
379
380                timeline_controller
381                    .replace_with_initial_remote_events(
382                        initial_events.into_iter(),
383                        RemoteEventOrigin::Cache,
384                    )
385                    .await;
386
387                continue;
388            }
389        };
390
391        match update {
392            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
393                trace!(target = %event_id, "Handling fully read marker.");
394                timeline_controller.handle_fully_read_marker(event_id).await;
395            }
396
397            RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
398                trace!("Received new timeline events diffs");
399                let origin = match origin {
400                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
401                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
402                    EventsOrigin::Cache => RemoteEventOrigin::Cache,
403                };
404
405                let has_diffs = !diffs.is_empty();
406
407                if is_live {
408                    timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
409                } else {
410                    // Only handle the remote aggregation for a non-live timeline.
411                    timeline_controller.handle_remote_aggregations(diffs, origin).await;
412                }
413
414                if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
415                    timeline_controller.retry_event_decryption(None).await;
416                }
417            }
418
419            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
420                trace!("Received new ephemeral events from sync.");
421
422                // TODO: (bnjbvr) ephemeral should be handled by the event cache.
423                timeline_controller.handle_ephemeral_events(events).await;
424            }
425
426            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
427                if !ambiguity_changes.is_empty() {
428                    let member_ambiguity_changes = ambiguity_changes
429                        .values()
430                        .flat_map(|change| change.user_ids())
431                        .collect::<BTreeSet<_>>();
432                    timeline_controller
433                        .force_update_sender_profiles(&member_ambiguity_changes)
434                        .await;
435                }
436            }
437        }
438    }
439}
440
441/// The task that handles the [`RoomSendQueueUpdate`]s.
442async fn room_send_queue_update_task(
443    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
444    timeline_controller: TimelineController,
445) {
446    trace!("spawned the local echo task!");
447
448    loop {
449        match send_queue_stream.recv().await {
450            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
451
452            Err(RecvError::Lagged(num_missed)) => {
453                warn!("missed {num_missed} local echoes, ignoring those missed");
454            }
455
456            Err(RecvError::Closed) => {
457                trace!("channel closed, exiting the local echo handler");
458                break;
459            }
460        }
461    }
462}
463
464/// The task that handles the room keys from backups.
465async fn room_keys_from_backups_task<S>(stream: S, timeline_controller: TimelineController)
466where
467    S: Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>>,
468{
469    pin_mut!(stream);
470
471    while let Some(update) = stream.next().await {
472        match update {
473            Ok(info) => {
474                let mut session_ids = BTreeSet::new();
475
476                for set in info.into_values() {
477                    session_ids.extend(set);
478                }
479
480                timeline_controller.retry_event_decryption(Some(session_ids)).await;
481            }
482            // We lagged, so retry every event.
483            Err(_) => timeline_controller.retry_event_decryption(None).await,
484        }
485    }
486}
487
488/// The task that handles the [`BackupState`] updates.
489async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
490where
491    S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
492{
493    pin_mut!(backup_states_stream);
494
495    while let Some(update) = backup_states_stream.next().await {
496        match update {
497            // If the backup got enabled, or we lagged and thus missed that the backup
498            // might be enabled, retry to decrypt all the events. Please note, depending
499            // on the backup download strategy, this might do two things under the
500            // assumption that the backup contains the relevant room keys:
501            //
502            // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set to `OneShot`.
503            // 2. It will fail to decrypt the event, but try to download the room key to decrypt it
504            //    if the `BackupDownloadStrategy` has been set to `AfterDecryptionFailure`.
505            Ok(BackupState::Enabled) | Err(_) => {
506                timeline_controller.retry_event_decryption(None).await;
507            }
508            // The other states aren't interesting since they are either still enabling
509            // the backup or have the backup in the disabled state.
510            Ok(
511                BackupState::Unknown
512                | BackupState::Creating
513                | BackupState::Resuming
514                | BackupState::Disabling
515                | BackupState::Downloading
516                | BackupState::Enabling,
517            ) => (),
518        }
519    }
520}
521
522/// The task that handles the [`RoomKeyInfo`] updates.
523async fn room_key_received_task<S>(
524    room_keys_received_stream: S,
525    timeline_controller: TimelineController,
526) where
527    S: Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
528{
529    pin_mut!(room_keys_received_stream);
530
531    let room_id = timeline_controller.room().room_id();
532
533    while let Some(room_keys) = room_keys_received_stream.next().await {
534        let session_ids = match room_keys {
535            Ok(room_keys) => {
536                let session_ids: BTreeSet<String> = room_keys
537                    .into_iter()
538                    .filter(|info| info.room_id == room_id)
539                    .map(|info| info.session_id)
540                    .collect();
541
542                Some(session_ids)
543            }
544            Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
545                // We lagged, let's retry to decrypt anything we have, maybe something
546                // was received.
547                warn!(
548                    missed_updates,
549                    "The room keys stream has lagged, retrying to decrypt the whole timeline"
550                );
551
552                None
553            }
554        };
555
556        timeline_controller.retry_event_decryption(session_ids).await;
557    }
558}