1use std::{
16 collections::{BTreeMap, BTreeSet},
17 sync::Arc,
18};
19
20use futures_core::Stream;
21use futures_util::{pin_mut, StreamExt};
22use matrix_sdk::{
23 crypto::store::RoomKeyInfo,
24 encryption::backups::BackupState,
25 event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheListener, RoomEventCacheUpdate},
26 executor::spawn,
27 send_queue::RoomSendQueueUpdate,
28 Room,
29};
30use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm};
31use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId};
32use tokio::sync::broadcast::{error::RecvError, Receiver};
33use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
34use tracing::{info_span, instrument, trace, warn, Instrument, Span};
35
36use super::{
37 controller::{TimelineController, TimelineSettings},
38 to_device::{handle_forwarded_room_key_event, handle_room_key_event},
39 DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus,
40};
41use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager};
42
43#[must_use]
46#[derive(Debug)]
47pub struct TimelineBuilder {
48 room: Room,
49 settings: TimelineSettings,
50 focus: TimelineFocus,
51
52 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
55
56 internal_id_prefix: Option<String>,
58}
59
60impl TimelineBuilder {
61 pub fn new(room: &Room) -> Self {
62 Self {
63 room: room.clone(),
64 settings: TimelineSettings::default(),
65 unable_to_decrypt_hook: None,
66 focus: TimelineFocus::Live { hide_threaded_events: false },
67 internal_id_prefix: None,
68 }
69 }
70
71 pub fn with_focus(mut self, focus: TimelineFocus) -> Self {
75 self.focus = focus;
76 self
77 }
78
79 pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
84 self.unable_to_decrypt_hook = Some(hook);
85 self
86 }
87
88 pub fn with_internal_id_prefix(mut self, prefix: String) -> Self {
93 self.internal_id_prefix = Some(prefix);
94 self
95 }
96
97 pub fn with_date_divider_mode(mut self, mode: DateDividerMode) -> Self {
100 self.settings.date_divider_mode = mode;
101 self
102 }
103
104 pub fn track_read_marker_and_receipts(mut self) -> Self {
107 self.settings.track_read_receipts = true;
108 self
109 }
110
111 pub fn event_filter<F>(mut self, filter: F) -> Self
137 where
138 F: Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool
139 + SendOutsideWasm
140 + SyncOutsideWasm
141 + 'static,
142 {
143 self.settings.event_filter = Arc::new(filter);
144 self
145 }
146
147 pub fn add_failed_to_parse(mut self, add: bool) -> Self {
151 self.settings.add_failed_to_parse = add;
152 self
153 }
154
155 #[tracing::instrument(
157 skip(self),
158 fields(
159 room_id = ?self.room.room_id(),
160 track_read_receipts = self.settings.track_read_receipts,
161 )
162 )]
163 pub async fn build(self) -> Result<Timeline, Error> {
164 let Self { room, settings, unable_to_decrypt_hook, focus, internal_id_prefix } = self;
165
166 let client = room.client();
167 let event_cache = client.event_cache();
168
169 event_cache.subscribe()?;
171
172 let (room_event_cache, event_cache_drop) = room.event_cache().await?;
173 let (_, event_subscriber) = room_event_cache.subscribe().await;
174
175 let is_live = matches!(focus, TimelineFocus::Live { .. });
176 let is_pinned_events = matches!(focus, TimelineFocus::PinnedEvents { .. });
177 let is_room_encrypted = room
178 .latest_encryption_state()
179 .await
180 .map(|state| state.is_encrypted())
181 .ok()
182 .unwrap_or_default();
183
184 let controller = TimelineController::new(
185 room.clone(),
186 focus.clone(),
187 internal_id_prefix.clone(),
188 unable_to_decrypt_hook,
189 is_room_encrypted,
190 )
191 .with_settings(settings);
192
193 let has_events = controller.init_focus(&room_event_cache).await?;
194
195 let pinned_events_join_handle = if is_pinned_events {
196 Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone())))
197 } else {
198 None
199 };
200
201 let encryption_changes_handle = spawn({
202 let inner = controller.clone();
203 async move {
204 inner.handle_encryption_state_changes().await;
205 }
206 });
207
208 let room_update_join_handle = spawn({
209 let span = info_span!(
210 parent: Span::none(),
211 "live_update_handler",
212 room_id = ?room.room_id(),
213 focus = focus.debug_string(),
214 prefix = internal_id_prefix
215 );
216 span.follows_from(Span::current());
217
218 room_event_cache_updates_task(
219 room_event_cache.clone(),
220 controller.clone(),
221 event_subscriber,
222 is_live,
223 )
224 .instrument(span)
225 });
226
227 let local_echo_listener_handle = {
228 let timeline_controller = controller.clone();
229 let (local_echoes, send_queue_stream) = room.send_queue().subscribe().await?;
230
231 spawn({
232 for echo in local_echoes {
234 timeline_controller.handle_local_echo(echo).await;
235 }
236
237 let span = info_span!(
238 parent: Span::none(),
239 "local_echo_handler",
240 room_id = ?room.room_id(),
241 focus = focus.debug_string(),
242 prefix = internal_id_prefix
243 );
244 span.follows_from(Span::current());
245
246 room_send_queue_update_task(send_queue_stream, timeline_controller).instrument(span)
247 })
248 };
249
250 let room_key_handle = client.add_event_handler(handle_room_key_event(
251 controller.clone(),
252 room.room_id().to_owned(),
253 ));
254
255 let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
256 controller.clone(),
257 room.room_id().to_owned(),
258 ));
259
260 let event_handlers = vec![room_key_handle, forwarded_room_key_handle];
261
262 let room_key_from_backups_join_handle = spawn(room_keys_from_backups_task(
266 client.encryption().backups().room_keys_for_room_stream(controller.room().room_id()),
267 controller.clone(),
268 ));
269
270 let room_key_backup_enabled_join_handle = spawn(backup_states_task(
271 client.encryption().backups().state_stream(),
272 controller.clone(),
273 ));
274
275 let room_keys_received_join_handle = {
281 spawn(room_key_received_task(
282 client.encryption().room_keys_received_stream().await.expect(
283 "We should be logged in by now, so we should have access to an `OlmMachine` \
284 to be able to listen to this stream",
285 ),
286 controller.clone(),
287 ))
288 };
289
290 let timeline = Timeline {
291 controller,
292 event_cache: room_event_cache,
293 drop_handle: Arc::new(TimelineDropHandle {
294 client,
295 event_handler_handles: event_handlers,
296 room_update_join_handle,
297 pinned_events_join_handle,
298 room_key_from_backups_join_handle,
299 room_key_backup_enabled_join_handle,
300 room_keys_received_join_handle,
301 local_echo_listener_handle,
302 _event_cache_drop_handle: event_cache_drop,
303 encryption_changes_handle,
304 }),
305 };
306
307 if has_events {
308 timeline.retry_decryption_for_all_events().await;
312 }
313
314 Ok(timeline)
315 }
316}
317
318#[instrument(
320 skip_all,
321 fields(
322 room_id = %timeline_controller.room().room_id(),
323 )
324)]
325async fn pinned_events_task<S>(pinned_event_ids_stream: S, timeline_controller: TimelineController)
326where
327 S: Stream<Item = Vec<OwnedEventId>>,
328{
329 pin_mut!(pinned_event_ids_stream);
330
331 while pinned_event_ids_stream.next().await.is_some() {
332 trace!("received a pinned events update");
333
334 match timeline_controller.reload_pinned_events().await {
335 Ok(Some(events)) => {
336 trace!("successfully reloaded pinned events");
337 timeline_controller
338 .replace_with_initial_remote_events(
339 events.into_iter(),
340 RemoteEventOrigin::Pagination,
341 )
342 .await;
343 }
344
345 Ok(None) => {
346 }
349
350 Err(err) => {
351 warn!("Failed to reload pinned events: {err}");
352 }
353 }
354 }
355}
356
357async fn room_event_cache_updates_task(
359 room_event_cache: RoomEventCache,
360 timeline_controller: TimelineController,
361 mut event_subscriber: RoomEventCacheListener,
362 is_live: bool,
363) {
364 trace!("Spawned the event subscriber task.");
365
366 loop {
367 trace!("Waiting for an event.");
368
369 let update = match event_subscriber.recv().await {
370 Ok(up) => up,
371 Err(RecvError::Closed) => break,
372 Err(RecvError::Lagged(num_skipped)) => {
373 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
374
375 let initial_events = room_event_cache.events().await;
379
380 timeline_controller
381 .replace_with_initial_remote_events(
382 initial_events.into_iter(),
383 RemoteEventOrigin::Cache,
384 )
385 .await;
386
387 continue;
388 }
389 };
390
391 match update {
392 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
393 trace!(target = %event_id, "Handling fully read marker.");
394 timeline_controller.handle_fully_read_marker(event_id).await;
395 }
396
397 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
398 trace!("Received new timeline events diffs");
399 let origin = match origin {
400 EventsOrigin::Sync => RemoteEventOrigin::Sync,
401 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
402 EventsOrigin::Cache => RemoteEventOrigin::Cache,
403 };
404
405 let has_diffs = !diffs.is_empty();
406
407 if is_live {
408 timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
409 } else {
410 timeline_controller.handle_remote_aggregations(diffs, origin).await;
412 }
413
414 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
415 timeline_controller.retry_event_decryption(None).await;
416 }
417 }
418
419 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
420 trace!("Received new ephemeral events from sync.");
421
422 timeline_controller.handle_ephemeral_events(events).await;
424 }
425
426 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
427 if !ambiguity_changes.is_empty() {
428 let member_ambiguity_changes = ambiguity_changes
429 .values()
430 .flat_map(|change| change.user_ids())
431 .collect::<BTreeSet<_>>();
432 timeline_controller
433 .force_update_sender_profiles(&member_ambiguity_changes)
434 .await;
435 }
436 }
437 }
438 }
439}
440
441async fn room_send_queue_update_task(
443 mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
444 timeline_controller: TimelineController,
445) {
446 trace!("spawned the local echo task!");
447
448 loop {
449 match send_queue_stream.recv().await {
450 Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
451
452 Err(RecvError::Lagged(num_missed)) => {
453 warn!("missed {num_missed} local echoes, ignoring those missed");
454 }
455
456 Err(RecvError::Closed) => {
457 trace!("channel closed, exiting the local echo handler");
458 break;
459 }
460 }
461 }
462}
463
464async fn room_keys_from_backups_task<S>(stream: S, timeline_controller: TimelineController)
466where
467 S: Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>>,
468{
469 pin_mut!(stream);
470
471 while let Some(update) = stream.next().await {
472 match update {
473 Ok(info) => {
474 let mut session_ids = BTreeSet::new();
475
476 for set in info.into_values() {
477 session_ids.extend(set);
478 }
479
480 timeline_controller.retry_event_decryption(Some(session_ids)).await;
481 }
482 Err(_) => timeline_controller.retry_event_decryption(None).await,
484 }
485 }
486}
487
488async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
490where
491 S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
492{
493 pin_mut!(backup_states_stream);
494
495 while let Some(update) = backup_states_stream.next().await {
496 match update {
497 Ok(BackupState::Enabled) | Err(_) => {
506 timeline_controller.retry_event_decryption(None).await;
507 }
508 Ok(
511 BackupState::Unknown
512 | BackupState::Creating
513 | BackupState::Resuming
514 | BackupState::Disabling
515 | BackupState::Downloading
516 | BackupState::Enabling,
517 ) => (),
518 }
519 }
520}
521
522async fn room_key_received_task<S>(
524 room_keys_received_stream: S,
525 timeline_controller: TimelineController,
526) where
527 S: Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>,
528{
529 pin_mut!(room_keys_received_stream);
530
531 let room_id = timeline_controller.room().room_id();
532
533 while let Some(room_keys) = room_keys_received_stream.next().await {
534 let session_ids = match room_keys {
535 Ok(room_keys) => {
536 let session_ids: BTreeSet<String> = room_keys
537 .into_iter()
538 .filter(|info| info.room_id == room_id)
539 .map(|info| info.session_id)
540 .collect();
541
542 Some(session_ids)
543 }
544 Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
545 warn!(
548 missed_updates,
549 "The room keys stream has lagged, retrying to decrypt the whole timeline"
550 );
551
552 None
553 }
554 };
555
556 timeline_controller.retry_event_decryption(session_ids).await;
557 }
558}