matrix_sdk_ui/timeline/
builder.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::{pin_mut, StreamExt};
18use matrix_sdk::{
19    encryption::backups::BackupState,
20    event_cache::{EventsOrigin, RoomEventCacheUpdate},
21    executor::spawn,
22    Room,
23};
24use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
25use tokio::sync::broadcast::error::RecvError;
26use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
27use tracing::{info, info_span, trace, warn, Instrument, Span};
28
29use super::{
30    controller::{TimelineController, TimelineSettings},
31    to_device::{handle_forwarded_room_key_event, handle_room_key_event},
32    DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
33};
34use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
35
36/// Builder that allows creating and configuring various parts of a
37/// [`Timeline`].
38#[must_use]
39#[derive(Debug)]
40pub struct TimelineBuilder {
41    room: Room,
42    settings: TimelineSettings,
43    focus: TimelineFocus,
44
45    /// An optional hook to call whenever we run into an unable-to-decrypt or a
46    /// late-decryption event.
47    unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
48
49    /// An optional prefix for internal IDs.
50    internal_id_prefix: Option<String>,
51}
52
53impl TimelineBuilder {
54    pub(super) fn new(room: &Room) -> Self {
55        Self {
56            room: room.clone(),
57            settings: TimelineSettings::default(),
58            unable_to_decrypt_hook: None,
59            focus: TimelineFocus::Live,
60            internal_id_prefix: None,
61        }
62    }
63
64    /// Sets up the initial focus for this timeline.
65    ///
66    /// This can be changed later on while the timeline is alive.
67    pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
68        self.focus = focus;
69        self
70    }
71
72    /// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
73    /// we're building.
74    ///
75    /// If it was previously set before, will overwrite the previous one.
76    pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
77        self.unable_to_decrypt_hook = Some(hook);
78        self
79    }
80
81    /// Sets the internal id prefix for this timeline.
82    ///
83    /// The prefix will be prepended to any internal ID using when generating
84    /// timeline IDs for this timeline.
85    pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
86        self.internal_id_prefix = Some(prefix);
87        self
88    }
89
90    /// Chose when to insert the date separators, either in between each day
91    /// or each month.
92    pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
93        self.settings.date_divider_mode = mode;
94        self
95    }
96
97    /// Enable tracking of the fully-read marker and the read receipts on the
98    /// timeline.
99    pub fn track_read_marker_and_receipts(mut self) -> Self {
100        self.settings.track_read_receipts = true;
101        self
102    }
103
104    /// Use the given filter to choose whether to add events to the timeline.
105    ///
106    /// # Arguments
107    ///
108    /// * `filter` - A function that takes a deserialized event, and should
109    ///   return `true` if the event should be added to the `Timeline`.
110    ///
111    /// If this is not overridden, the timeline uses the default filter that
112    /// only allows events that are materialized into a `Timeline` item. For
113    /// instance, reactions and edits don't get their own timeline item (as
114    /// they affect another existing one), so they're "filtered out" to
115    /// reflect that.
116    ///
117    /// You can use the default event filter with
118    /// [`crate::timeline::default_event_filter`] so as to chain it with
119    /// your own event filter, if you want to avoid situations where a read
120    /// receipt would be attached to an event that doesn't get its own
121    /// timeline item.
122    ///
123    /// Note that currently:
124    ///
125    /// - Not all event types have a representation as a `TimelineItem` so these
126    ///   are not added no matter what the filter returns.
127    /// - It is not possible to filter out `m.room.encrypted` events (otherwise
128    ///   they couldn't be decrypted when the appropriate room key arrives).
129    pub fn event_filter<F>(mut self, filter: F) -> Self
130    where
131        F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync + 'static,
132    {
133        self.settings.event_filter = Arc::new(filter);
134        self
135    }
136
137    /// Whether to add events that failed to deserialize to the timeline.
138    ///
139    /// Defaults to `true`.
140    pub fn add_failed_to_parse(mut self, add: bool) -> Self {
141        self.settings.add_failed_to_parse = add;
142        self
143    }
144
145    /// Create a [`Timeline`] with the options set on this builder.
146    #[tracing::instrument(
147        skip(self),
148        fields(
149            room_id = ?self.room.room_id(),
150            track_read_receipts = self.settings.track_read_receipts,
151        )
152    )]
153    pub async fn build(self) -> Result<Timeline, Error> {
154        let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
155
156        let client = room.client();
157        let event_cache = client.event_cache();
158
159        // Subscribe the event cache to sync responses, in case we hadn't done it yet.
160        event_cache.subscribe()?;
161
162        let (room_event_cache, event_cache_drop) = room.event_cache().await?;
163        let (_, mut event_subscriber) = room_event_cache.subscribe().await?;
164
165        let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
166        let is_room_encrypted = room.is_encrypted().await.ok();
167
168        let controller = TimelineController::new(
169            room,
170            focus.clone(),
171            internal_id_prefix.clone(),
172            unable_to_decrypt_hook,
173            is_room_encrypted,
174        )
175        .with_settings(settings);
176
177        let has_events = controller.init_focus(&room_event_cache).await?;
178
179        let room = controller.room();
180        let client = room.client();
181
182        let pinned_events_join_handle = if is_pinned_events {
183            let mut pinned_event_ids_stream = room.pinned_event_ids_stream();
184            Some(spawn({
185                let inner = controller.clone();
186                async move {
187                    while pinned_event_ids_stream.next().await.is_some() {
188                        if let Ok(events) = inner.reload_pinned_events().await {
189                            inner
190                                .replace_with_initial_remote_events(
191                                    events.into_iter(),
192                                    RemoteEventOrigin::Pagination,
193                                )
194                                .await;
195                        }
196                    }
197                }
198            }))
199        } else {
200            None
201        };
202
203        let encryption_changes_handle = spawn({
204            let inner = controller.clone();
205            async move {
206                inner.handle_encryption_state_changes().await;
207            }
208        });
209
210        let room_update_join_handle = spawn({
211            let room_event_cache = room_event_cache.clone();
212            let inner = controller.clone();
213
214            let span = info_span!(
215                parent: Span::none(),
216                "live_update_handler",
217                room_id = ?room.room_id(),
218                focus = focus.debug_string(),
219                prefix = internal_id_prefix
220            );
221            span.follows_from(Span::current());
222
223            async move {
224                trace!("Spawned the event subscriber task.");
225
226                loop {
227                    trace!("Waiting for an event.");
228
229                    let update = match event_subscriber.recv().await {
230                        Ok(up) => up,
231                        Err(RecvError::Closed) => break,
232                        Err(RecvError::Lagged(num_skipped)) => {
233                            warn!(
234                                num_skipped,
235                                "Lagged behind event cache updates, resetting timeline"
236                            );
237
238                            // The updates might have lagged, but the room event cache might have
239                            // events, so retrieve them and add them back again to the timeline,
240                            // after clearing it.
241                            //
242                            // If we can't get a handle on the room cache's events, just clear the
243                            // current timeline.
244                            match room_event_cache.subscribe().await {
245                                Ok((events, _)) => {
246                                    inner.replace_with_initial_remote_events(events.into_iter(), RemoteEventOrigin::Sync).await;
247                                }
248                                Err(err) => {
249                                    warn!("Error when re-inserting initial events into the timeline: {err}");
250                                    inner.clear().await;
251                                }
252                            }
253
254                            continue;
255                        }
256                    };
257
258                    match update {
259                        RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
260                            trace!(target = %event_id, "Handling fully read marker.");
261                            inner.handle_fully_read_marker(event_id).await;
262                        }
263
264                        RoomEventCacheUpdate::Clear => {
265                            if !inner.is_live().await {
266                                // Ignore a clear for a timeline not in the live mode; the
267                                // focused-on-event mode doesn't add any new items to the timeline
268                                // anyways.
269                                continue;
270                            }
271
272                            trace!("Clearing the timeline.");
273                            inner.clear().await;
274                        }
275
276                        RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
277                            trace!("Received new timeline events diffs");
278
279                            inner.handle_remote_events_with_diffs(
280                                diffs,
281                                match origin {
282                                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
283                                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
284                                }
285                            ).await;
286                        }
287
288                        RoomEventCacheUpdate::AddEphemeralEvents { events } => {
289                            trace!("Received new ephemeral events from sync.");
290
291                            // TODO: (bnjbvr) ephemeral should be handled by the event cache.
292                            inner.handle_ephemeral_events(events).await;
293                        }
294
295                        RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
296                            if !ambiguity_changes.is_empty() {
297                                let member_ambiguity_changes = ambiguity_changes
298                                    .values()
299                                    .flat_map(|change| change.user_ids())
300                                    .collect::<BTreeSet<_>>();
301                                inner.force_update_sender_profiles(&member_ambiguity_changes).await;
302                            }
303                        }
304                    }
305                }
306            }
307            .instrument(span)
308        });
309
310        let local_echo_listener_handle = {
311            let timeline = controller.clone();
312            let (local_echoes, mut listener) = room.send_queue().subscribe().await?;
313
314            spawn({
315                // Handles existing local echoes first.
316                for echo in local_echoes {
317                    timeline.handle_local_echo(echo).await;
318                }
319
320                let span = info_span!(
321                    parent: Span::none(),
322                    "local_echo_handler",
323                    room_id = ?room.room_id(),
324                    focus = focus.debug_string(),
325                    prefix = internal_id_prefix
326                );
327                span.follows_from(Span::current());
328
329                // React to future local echoes too.
330                async move {
331                    info!("spawned the local echo handler!");
332
333                    loop {
334                        match listener.recv().await {
335                            Ok(update) => timeline.handle_room_send_queue_update(update).await,
336
337                            Err(RecvError::Lagged(num_missed)) => {
338                                warn!("missed {num_missed} local echoes, ignoring those missed");
339                            }
340
341                            Err(RecvError::Closed) => {
342                                info!("channel closed, exiting the local echo handler");
343                                break;
344                            }
345                        }
346                    }
347                }
348                .instrument(span)
349            })
350        };
351
352        // Not using room.add_event_handler here because RoomKey events are
353        // to-device events that are not received in the context of a room.
354
355        let room_key_handle = client.add_event_handler(handle_room_key_event(
356            controller.clone(),
357            room.room_id().to_owned(),
358        ));
359
360        let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
361            controller.clone(),
362            room.room_id().to_owned(),
363        ));
364
365        let handles = vec![room_key_handle, forwarded_room_key_handle];
366
367        let room_key_from_backups_join_handle = {
368            let inner = controller.clone();
369            let room_id = inner.room().room_id();
370
371            let stream = client.encryption().backups().room_keys_for_room_stream(room_id);
372
373            spawn(async move {
374                pin_mut!(stream);
375
376                while let Some(update) = stream.next().await {
377                    let room = inner.room();
378
379                    match update {
380                        Ok(info) => {
381                            let mut session_ids = BTreeSet::new();
382
383                            for set in info.into_values() {
384                                session_ids.extend(set);
385                            }
386
387                            inner.retry_event_decryption(room, Some(session_ids)).await;
388                        }
389                        // We lagged, so retry every event.
390                        Err(_) => inner.retry_event_decryption(room, None).await,
391                    }
392                }
393            })
394        };
395
396        let room_key_backup_enabled_join_handle = {
397            let inner = controller.clone();
398            let stream = client.encryption().backups().state_stream();
399
400            spawn(async move {
401                pin_mut!(stream);
402
403                while let Some(update) = stream.next().await {
404                    match update {
405                        // If the backup got enabled, or we lagged and thus missed that the backup
406                        // might be enabled, retry to decrypt all the events. Please note, depending
407                        // on the backup download strategy, this might do two things under the
408                        // assumption that the backup contains the relevant room keys:
409                        //
410                        // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set
411                        //    to `OneShot`.
412                        // 2. It will fail to decrypt the event, but try to download the room key to
413                        //    decrypt it if the `BackupDownloadStrategy` has been set to
414                        //    `AfterDecryptionFailure`.
415                        Ok(BackupState::Enabled) | Err(_) => {
416                            let room = inner.room();
417                            inner.retry_event_decryption(room, None).await;
418                        }
419                        // The other states aren't interesting since they are either still enabling
420                        // the backup or have the backup in the disabled state.
421                        Ok(
422                            BackupState::Unknown
423                            | BackupState::Creating
424                            | BackupState::Resuming
425                            | BackupState::Disabling
426                            | BackupState::Downloading
427                            | BackupState::Enabling,
428                        ) => (),
429                    }
430                }
431            })
432        };
433
434        // TODO: Technically, this should be the only stream we need to listen to get
435        // notified when we should retry to decrypt an event. We sadly can't do that,
436        // since the cross-process support kills the `OlmMachine` which then in
437        // turn kills this stream. Once this is solved remove all the other ways we
438        // listen for room keys.
439        let room_keys_received_join_handle = {
440            let inner = controller.clone();
441            let stream = client.encryption().room_keys_received_stream().await.expect(
442                "We should be logged in by now, so we should have access to an OlmMachine \
443                 to be able to listen to this stream",
444            );
445
446            spawn(async move {
447                pin_mut!(stream);
448
449                while let Some(room_keys) = stream.next().await {
450                    let session_ids = match room_keys {
451                        Ok(room_keys) => {
452                            let session_ids: BTreeSet<String> = room_keys
453                                .into_iter()
454                                .filter(|info| info.room_id == inner.room().room_id())
455                                .map(|info| info.session_id)
456                                .collect();
457
458                            Some(session_ids)
459                        }
460                        Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
461                            // We lagged, let's retry to decrypt anything we have, maybe something
462                            // was received.
463                            warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");
464
465                            None
466                        }
467                    };
468
469                    let room = inner.room();
470                    inner.retry_event_decryption(room, session_ids).await;
471                }
472            })
473        };
474
475        let timeline = Timeline {
476            controller,
477            event_cache: room_event_cache,
478            drop_handle: Arc::new(TimelineDropHandle {
479                client,
480                event_handler_handles: handles,
481                room_update_join_handle,
482                pinned_events_join_handle,
483                room_key_from_backups_join_handle,
484                room_key_backup_enabled_join_handle,
485                room_keys_received_join_handle,
486                local_echo_listener_handle,
487                _event_cache_drop_handle: event_cache_drop,
488                encryption_changes_handle,
489            }),
490        };
491
492        if has_events {
493            // The events we're injecting might be encrypted events, but we might
494            // have received the room key to decrypt them while nobody was listening to the
495            // `m.room_key` event, let's retry now.
496            timeline.retry_decryption_for_all_events().await;
497        }
498
499        Ok(timeline)
500    }
501}