Skip to main content

matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeSet,
17    fmt,
18    sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use matrix_sdk::{
27    deserialized_responses::TimelineEvent,
28    event_cache::{
29        DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache,
30        TimelineVectorDiffs,
31    },
32    send_queue::{
33        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
34    },
35    task_monitor::BackgroundTaskHandle,
36};
37#[cfg(test)]
38use ruma::events::receipt::ReceiptEventContent;
39use ruma::{
40    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
41    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
42    events::{
43        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
44        AnySyncTimelineEvent, MessageLikeEventType,
45        poll::unstable_start::UnstablePollStartEventContent,
46        reaction::ReactionEventContent,
47        receipt::{Receipt, ReceiptThread, ReceiptType},
48        relation::Annotation,
49        room::message::{MessageType, Relation},
50    },
51    room_version_rules::RoomVersionRules,
52    serde::Raw,
53};
54use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
55use tracing::{
56    Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
57};
58
59pub(super) use self::{
60    metadata::{RelativePosition, TimelineMetadata},
61    observable_items::{
62        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
63        ObservableItemsTransactionEntry,
64    },
65    state::TimelineState,
66    state_transaction::TimelineStateTransaction,
67};
68use super::{
69    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
70    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
71    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
72    TimelineReadReceiptTracking, VirtualTimelineItem,
73    algorithms::{rfind_event_by_id, rfind_event_item},
74    event_item::{ReactionStatus, RemoteEventOrigin},
75    item::TimelineUniqueId,
76    subscriber::TimelineSubscriber,
77    traits::RoomDataProvider,
78};
79use crate::{
80    timeline::{
81        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
82        algorithms::rfind_event_by_item_id,
83        controller::decryption_retry_task::compute_redecryption_candidates,
84        date_dividers::DateDividerAdjuster,
85        event_item::TimelineItemHandle,
86        tasks::{event_focused_task, pinned_events_task, thread_updates_task},
87    },
88    unable_to_decrypt_hook::UtdHookManager,
89};
90
91pub(in crate::timeline) mod aggregations;
92mod decryption_retry_task;
93mod metadata;
94mod observable_items;
95mod read_receipts;
96mod state;
97mod state_transaction;
98
99pub(super) use aggregations::*;
100pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
101
102/// Data associated to the current timeline focus.
103///
104/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
105/// version of it, including extra state that makes it useful over the lifetime
106/// of a timeline.
107#[derive(Debug)]
108pub(in crate::timeline) enum TimelineFocusKind {
109    /// The timeline receives live events from the sync.
110    Live {
111        /// Whether to hide in-thread events from the timeline.
112        hide_threaded_events: bool,
113    },
114
115    /// The timeline is focused on a single event, and it can expand in one
116    /// direction or another.
117    Event {
118        /// The focused event ID.
119        focused_event_id: OwnedEventId,
120
121        /// If the focused event is part or the root of a thread, what's the
122        /// thread root?
123        ///
124        /// This is determined once when initializing the event-focused cache,
125        /// and then it won't change for the duration of this timeline.
126        thread_root: OnceLock<OwnedEventId>,
127
128        /// The thread mode to use for this event-focused timeline, which is
129        /// part of the key for the memoized event-focused cache.
130        thread_mode: TimelineEventFocusThreadMode,
131    },
132
133    /// A live timeline for a thread.
134    Thread {
135        /// The root event for the current thread.
136        root_event_id: OwnedEventId,
137    },
138
139    PinnedEvents,
140}
141
142impl TimelineFocusKind {
143    /// Returns the [`ReceiptThread`] that should be used for the current
144    /// timeline focus.
145    ///
146    /// Live and event timelines will use the unthreaded read receipt type in
147    /// general, unless they hide in-thread events, in which case they will
148    /// use the main thread.
149    pub(super) fn receipt_thread(&self) -> ReceiptThread {
150        if let Some(thread_root) = self.thread_root() {
151            ReceiptThread::Thread(thread_root.to_owned())
152        } else if self.hide_threaded_events() {
153            ReceiptThread::Main
154        } else {
155            ReceiptThread::Unthreaded
156        }
157    }
158
159    /// Whether to hide in-thread events from the timeline.
160    fn hide_threaded_events(&self) -> bool {
161        match self {
162            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
163            TimelineFocusKind::Event { thread_mode, .. } => {
164                matches!(
165                    thread_mode,
166                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
167                )
168            }
169            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
170        }
171    }
172
173    /// Whether the focus is on a thread (from a live thread or a thread
174    /// permalink).
175    fn is_thread(&self) -> bool {
176        self.thread_root().is_some()
177    }
178
179    /// If the focus is a thread, returns its root event ID.
180    fn thread_root(&self) -> Option<&EventId> {
181        match self {
182            TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
183            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
184            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
185        }
186    }
187}
188
189#[derive(Clone, Debug)]
190pub(super) struct TimelineController<P: RoomDataProvider = Room> {
191    /// Inner mutable state.
192    state: Arc<RwLock<TimelineState<P>>>,
193
194    /// Focus data.
195    focus: Arc<TimelineFocusKind>,
196
197    /// A [`RoomDataProvider`] implementation, providing data.
198    ///
199    /// The type is a `RoomDataProvider` to allow testing. In the real world,
200    /// this would normally be a [`Room`].
201    pub(crate) room_data_provider: P,
202
203    /// Settings applied to this timeline.
204    pub(super) settings: TimelineSettings,
205}
206
207#[derive(Clone)]
208pub(super) struct TimelineSettings {
209    /// Should the read receipts and read markers be handled and on which event
210    /// types?
211    pub(super) track_read_receipts: TimelineReadReceiptTracking,
212
213    /// Event filter that controls what's rendered as a timeline item (and thus
214    /// what can carry read receipts).
215    pub(super) event_filter: Arc<TimelineEventFilterFn>,
216
217    /// Are unparsable events added as timeline items of their own kind?
218    pub(super) add_failed_to_parse: bool,
219
220    /// Should the timeline items be grouped by day or month?
221    pub(super) date_divider_mode: DateDividerMode,
222}
223
224#[cfg(not(tarpaulin_include))]
225impl fmt::Debug for TimelineSettings {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.debug_struct("TimelineSettings")
228            .field("track_read_receipts", &self.track_read_receipts)
229            .field("add_failed_to_parse", &self.add_failed_to_parse)
230            .finish_non_exhaustive()
231    }
232}
233
234impl Default for TimelineSettings {
235    fn default() -> Self {
236        Self {
237            track_read_receipts: TimelineReadReceiptTracking::Disabled,
238            event_filter: Arc::new(default_event_filter),
239            add_failed_to_parse: true,
240            date_divider_mode: DateDividerMode::Daily,
241        }
242    }
243}
244
245/// The default event filter for
246/// [`crate::timeline::TimelineBuilder::event_filter`].
247///
248/// It filters out events that are not rendered by the timeline, including but
249/// not limited to: reactions, edits, redactions on existing messages.
250///
251/// If you have a custom filter, it may be best to chain yours with this one if
252/// you do not want to run into situations where a read receipt is not visible
253/// because it's living on an event that doesn't have a matching timeline item.
254pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
255    match event {
256        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
257            if ev.redacts(&rules.redaction).is_some() {
258                // This is a redaction of an existing message, we'll only update the previous
259                // message and not render a new entry.
260                false
261            } else {
262                // This is a redacted entry, that we'll show only if the redacted entity wasn't
263                // a reaction.
264                ev.event_type() != MessageLikeEventType::Reaction
265            }
266        }
267
268        AnySyncTimelineEvent::MessageLike(msg) => {
269            match msg.original_content() {
270                None => {
271                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
272                    // a reaction.
273                    msg.event_type() != MessageLikeEventType::Reaction
274                }
275
276                Some(original_content) => {
277                    match original_content {
278                        AnyMessageLikeEventContent::RoomMessage(content) => {
279                            if content
280                                .relates_to
281                                .as_ref()
282                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
283                            {
284                                // Edits aren't visible by default.
285                                return false;
286                            }
287
288                            match content.msgtype {
289                                MessageType::Audio(_)
290                                | MessageType::Emote(_)
291                                | MessageType::File(_)
292                                | MessageType::Image(_)
293                                | MessageType::Location(_)
294                                | MessageType::Notice(_)
295                                | MessageType::ServerNotice(_)
296                                | MessageType::Text(_)
297                                | MessageType::Video(_)
298                                | MessageType::VerificationRequest(_) => true,
299                                #[cfg(feature = "unstable-msc4274")]
300                                MessageType::Gallery(_) => true,
301                                _ => false,
302                            }
303                        }
304
305                        AnyMessageLikeEventContent::Sticker(_)
306                        | AnyMessageLikeEventContent::UnstablePollStart(
307                            UnstablePollStartEventContent::New(_),
308                        )
309                        | AnyMessageLikeEventContent::CallInvite(_)
310                        | AnyMessageLikeEventContent::RtcNotification(_)
311                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
312
313                        // Beacon location-update events are aggregated onto
314                        // their parent `beacon_info` state event's timeline
315                        // item. They are never rendered as standalone items.
316                        AnyMessageLikeEventContent::Beacon(_) => false,
317                        // Ignore decline events, the matching RtcNotification event will be updated
318                        // to reflect the decline.
319                        AnyMessageLikeEventContent::RtcDecline(_) => false,
320
321                        _ => false,
322                    }
323                }
324            }
325        }
326
327        AnySyncTimelineEvent::State(_) => {
328            // All the state events may get displayed by default.
329            true
330        }
331    }
332}
333
334/// Result of calling [`TimelineController::init_focus`].
335pub(super) struct InitFocusResult {
336    /// Did the initialization result in having some events in the timeline?
337    pub has_events: bool,
338    /// If the timeline is a non-live timeline, an extra task that subscribes to
339    /// changes to the focus source.
340    pub focus_task: Option<BackgroundTaskHandle>,
341}
342
343impl<P: RoomDataProvider> TimelineController<P> {
344    pub(super) fn new(
345        room_data_provider: P,
346        focus: TimelineFocus,
347        internal_id_prefix: Option<String>,
348        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
349        is_room_encrypted: bool,
350        settings: TimelineSettings,
351    ) -> Self {
352        let focus = match focus {
353            TimelineFocus::Live { hide_threaded_events } => {
354                TimelineFocusKind::Live { hide_threaded_events }
355            }
356
357            TimelineFocus::Event { target, thread_mode, .. } => {
358                TimelineFocusKind::Event {
359                    focused_event_id: target,
360                    // This will be initialized in `Self::init_focus`.
361                    thread_root: OnceLock::new(),
362                    thread_mode,
363                }
364            }
365
366            TimelineFocus::Thread { root_event_id, .. } => {
367                TimelineFocusKind::Thread { root_event_id }
368            }
369
370            TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
371        };
372
373        let focus = Arc::new(focus);
374        let state = Arc::new(RwLock::new(TimelineState::new(
375            focus.clone(),
376            room_data_provider.own_user_id().to_owned(),
377            room_data_provider.room_version_rules(),
378            internal_id_prefix,
379            unable_to_decrypt_hook,
380            is_room_encrypted,
381        )));
382
383        Self { state, focus, room_data_provider, settings }
384    }
385
386    /// Listens to encryption state changes for the room in
387    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
388    /// existing timeline items. This will then cause a refresh of those
389    /// timeline items.
390    pub async fn handle_encryption_state_changes(&self) {
391        let mut room_info = self.room_data_provider.room_info();
392
393        // Small function helper to help mark as encrypted.
394        let mark_encrypted = || async {
395            let mut state = self.state.write().await;
396            state.meta.is_room_encrypted = true;
397            state.mark_all_events_as_encrypted();
398        };
399
400        if room_info.get().encryption_state().is_encrypted() {
401            // If the room was already encrypted, it won't toggle to unencrypted, so we can
402            // shut down this task early.
403            mark_encrypted().await;
404            return;
405        }
406
407        while let Some(info) = room_info.next().await {
408            if info.encryption_state().is_encrypted() {
409                mark_encrypted().await;
410                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
411                // here is done.
412                break;
413            }
414        }
415    }
416
417    /// Run a lazy backwards pagination (in live mode).
418    ///
419    /// It adjusts the `count` value of the `Skip` higher-order stream so that
420    /// more items are pushed front in the timeline.
421    ///
422    /// If no more items are available (i.e. if the `count` is zero), this
423    /// method returns `Some(needs)` where `needs` is the number of events that
424    /// must be unlazily backwards paginated.
425    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
426        let state = self.state.read().await;
427
428        let (count, needs) = state
429            .meta
430            .subscriber_skip_count
431            .compute_next_when_paginating_backwards(num_events.into());
432
433        // This always happens on a live timeline.
434        let is_live_timeline = true;
435        state.meta.subscriber_skip_count.update(count, is_live_timeline);
436
437        needs
438    }
439
440    /// Is this timeline receiving events from sync (aka has a live focus)?
441    pub(super) fn is_live(&self) -> bool {
442        matches!(&*self.focus, TimelineFocusKind::Live { .. })
443    }
444
445    /// Is this timeline focused on a thread?
446    pub(super) fn is_threaded(&self) -> bool {
447        self.focus.is_thread()
448    }
449
450    /// The root of the current thread, for a live thread timeline or a
451    /// permalink to a thread message.
452    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
453        self.focus.thread_root().map(ToOwned::to_owned)
454    }
455
456    /// Get a copy of the current items in the list.
457    ///
458    /// Cheap because `im::Vector` is cheap to clone.
459    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
460        self.state.read().await.items.clone_items()
461    }
462
463    #[cfg(test)]
464    pub(super) async fn subscribe_raw(
465        &self,
466    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
467        self.state.read().await.items.subscribe().into_values_and_stream()
468    }
469
470    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
471        let state = self.state.read().await;
472
473        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
474    }
475
476    pub(super) async fn subscribe_filter_map<U, F>(
477        &self,
478        f: F,
479    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
480    where
481        U: Clone,
482        F: Fn(Arc<TimelineItem>) -> Option<U>,
483    {
484        self.state.read().await.items.subscribe().filter_map(f)
485    }
486
487    /// Toggle a reaction locally.
488    ///
489    /// Returns true if the reaction was added, false if it was removed.
490    #[instrument(skip_all)]
491    pub(super) async fn toggle_reaction_local(
492        &self,
493        item_id: &TimelineEventItemId,
494        key: &str,
495    ) -> Result<bool, Error> {
496        let mut state = self.state.write().await;
497
498        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
499            warn!("Timeline item not found, can't add reaction");
500            return Err(Error::FailedToToggleReaction);
501        };
502
503        let user_id = self.room_data_provider.own_user_id();
504        let prev_status = item
505            .content()
506            .reactions()
507            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
508
509        let Some(prev_status) = prev_status else {
510            // Adding the new reaction.
511            match item.handle() {
512                TimelineItemHandle::Local(send_handle) => {
513                    if send_handle
514                        .react(key.to_owned())
515                        .await
516                        .map_err(|err| Error::SendQueueError(err.into()))?
517                        .is_some()
518                    {
519                        trace!("adding a reaction to a local echo");
520                        return Ok(true);
521                    }
522
523                    warn!("couldn't toggle reaction for local echo");
524                    return Ok(false);
525                }
526
527                TimelineItemHandle::Remote(event_id) => {
528                    // Add a reaction through the room data provider.
529                    // No need to reflect the effect locally, since the local echo handling will
530                    // take care of it.
531                    trace!("adding a reaction to a remote echo");
532                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
533                    self.room_data_provider
534                        .send(ReactionEventContent::from(annotation).into())
535                        .await?;
536                    return Ok(true);
537                }
538            }
539        };
540
541        trace!("removing a previous reaction");
542        match prev_status {
543            ReactionStatus::LocalToLocal(send_reaction_handle) => {
544                if let Some(handle) = send_reaction_handle {
545                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
546                        // Impossible state: the reaction has moved from local to echo under our
547                        // feet, but the timeline was supposed to be locked!
548                        warn!("unexpectedly unable to abort sending of local reaction");
549                    }
550                } else {
551                    warn!("no send reaction handle (this should only happen in testing contexts)");
552                }
553            }
554
555            ReactionStatus::LocalToRemote(send_handle) => {
556                // No need to reflect the change ourselves, since handling the discard of the
557                // local echo will take care of it.
558                trace!("aborting send of the previous reaction that was a local echo");
559                if let Some(handle) = send_handle {
560                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
561                        // Impossible state: the reaction has moved from local to echo under our
562                        // feet, but the timeline was supposed to be locked!
563                        warn!("unexpectedly unable to abort sending of local reaction");
564                    }
565                } else {
566                    warn!("no send handle (this should only happen in testing contexts)");
567                }
568            }
569
570            ReactionStatus::RemoteToRemote(event_id) => {
571                // Assume the redaction will work; we'll re-add the reaction if it didn't.
572                let Some(annotated_event_id) =
573                    item.as_remote().map(|event_item| event_item.event_id.clone())
574                else {
575                    warn!("remote reaction to remote event, but the associated item isn't remote");
576                    return Ok(false);
577                };
578
579                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
580                let reaction_info = reactions.remove_reaction(user_id, key);
581
582                if reaction_info.is_some() {
583                    let new_item = item.with_reactions(reactions);
584                    state.items.replace(item_pos, new_item);
585                } else {
586                    warn!(
587                        "reaction is missing on the item, not removing it locally, \
588                         but sending redaction."
589                    );
590                }
591
592                // Release the lock before running the request.
593                drop(state);
594
595                trace!("sending redact for a previous reaction");
596                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
597                    if let Some(reaction_info) = reaction_info {
598                        debug!("sending redact failed, adding the reaction back to the list");
599
600                        let mut state = self.state.write().await;
601                        if let Some((item_pos, item)) =
602                            rfind_event_by_id(&state.items, &annotated_event_id)
603                        {
604                            // Re-add the reaction to the mapping.
605                            let mut reactions =
606                                item.content().reactions().cloned().unwrap_or_default();
607                            reactions
608                                .entry(key.to_owned())
609                                .or_default()
610                                .insert(user_id.to_owned(), reaction_info);
611                            let new_item = item.with_reactions(reactions);
612                            state.items.replace(item_pos, new_item);
613                        } else {
614                            warn!(
615                                "couldn't find item to re-add reaction anymore; \
616                                 maybe it's been redacted?"
617                            );
618                        }
619                    }
620
621                    return Err(err);
622                }
623            }
624        }
625
626        Ok(false)
627    }
628
629    /// Handle updates on events as [`VectorDiff`]s.
630    pub(super) async fn handle_remote_events_with_diffs(
631        &self,
632        diffs: Vec<VectorDiff<TimelineEvent>>,
633        origin: RemoteEventOrigin,
634    ) {
635        if diffs.is_empty() {
636            return;
637        }
638
639        let mut state = self.state.write().await;
640        state
641            .handle_remote_events_with_diffs(
642                diffs,
643                origin,
644                &self.room_data_provider,
645                &self.settings,
646            )
647            .await
648    }
649
650    /// Only handle aggregations received as [`VectorDiff`]s.
651    pub(super) async fn handle_remote_aggregations(
652        &self,
653        diffs: Vec<VectorDiff<TimelineEvent>>,
654        origin: RemoteEventOrigin,
655    ) {
656        if diffs.is_empty() {
657            return;
658        }
659
660        let mut state = self.state.write().await;
661        state
662            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
663            .await
664    }
665
666    pub(super) async fn clear(&self) {
667        self.state.write().await.clear();
668    }
669
670    /// Replaces the content of the current timeline with initial events.
671    ///
672    /// Also sets up read receipts and the read marker for a live timeline of a
673    /// room.
674    ///
675    /// This is all done with a single lock guard, since we don't want the state
676    /// to be modified between the clear and re-insertion of new events.
677    pub(super) async fn replace_with_initial_remote_events<Events>(
678        &self,
679        events: Events,
680        origin: RemoteEventOrigin,
681    ) where
682        Events: IntoIterator,
683        <Events as IntoIterator>::Item: Into<TimelineEvent>,
684    {
685        let mut state = self.state.write().await;
686
687        let track_read_markers = &self.settings.track_read_receipts;
688        if track_read_markers.is_enabled() {
689            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
690            state
691                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
692                .await;
693        }
694
695        // Replace the events if either the current event list or the new one aren't
696        // empty.
697        // Previously we just had to check the new one wasn't empty because
698        // we did a clear operation before so the current one would always be empty, but
699        // now we may want to replace a populated timeline with an empty one.
700        let mut events = events.into_iter().peekable();
701        if !state.items.is_empty() || events.peek().is_some() {
702            state
703                .replace_with_remote_events(
704                    events,
705                    origin,
706                    &self.room_data_provider,
707                    &self.settings,
708                )
709                .await;
710        }
711
712        if track_read_markers.is_enabled() {
713            if let Some(fully_read_event_id) =
714                self.room_data_provider.load_fully_read_marker().await
715            {
716                state.handle_fully_read_marker(fully_read_event_id);
717            } else if let Some(latest_receipt_event_id) = state
718                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
719            {
720                // Fall back to read receipt if no fully read marker exists.
721                debug!("no `m.fully_read` marker found, falling back to read receipt");
722                state.handle_fully_read_marker(latest_receipt_event_id);
723            }
724        }
725    }
726
727    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
728        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
729    }
730
731    pub(super) async fn handle_ephemeral_events(
732        &self,
733        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
734    ) {
735        // Don't even take the lock if there are no events to process.
736        if events.is_empty() {
737            return;
738        }
739        let mut state = self.state.write().await;
740        state.handle_ephemeral_events(events, &self.room_data_provider).await;
741    }
742
743    /// Creates the local echo for an event we're sending.
744    #[instrument(skip_all)]
745    pub(super) async fn handle_local_event(
746        &self,
747        txn_id: OwnedTransactionId,
748        content: AnyMessageLikeEventContent,
749        send_handle: Option<SendHandle>,
750    ) {
751        let sender = self.room_data_provider.own_user_id().to_owned();
752        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
753
754        let date_divider_mode = self.settings.date_divider_mode.clone();
755
756        let mut state = self.state.write().await;
757        state
758            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
759            .await;
760    }
761
762    /// Update the send state of a local event represented by a transaction ID.
763    ///
764    /// If the corresponding local timeline item is missing, a warning is
765    /// raised.
766    #[instrument(skip(self))]
767    pub(super) async fn update_event_send_state(
768        &self,
769        txn_id: &TransactionId,
770        send_state: EventSendState,
771    ) {
772        let mut state = self.state.write().await;
773        let mut txn = state.transaction();
774
775        let new_event_id: Option<&EventId> =
776            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
777
778        // The local echoes are always at the end of the timeline, we must first make
779        // sure the remote echo hasn't showed up yet.
780        if rfind_event_item(&txn.items, |it| {
781            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
782        })
783        .is_some()
784        {
785            // Remote echo already received. This is very unlikely.
786            trace!("Remote echo received before send-event response");
787
788            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
789
790            // If there's both the remote echo and a local echo, that means the
791            // remote echo was received before the response *and* contained no
792            // transaction ID (and thus duplicated the local echo).
793            if let Some((idx, _)) = local_echo {
794                warn!("Message echo got duplicated, removing the local one");
795                txn.items.remove(idx);
796
797                // Adjust the date dividers, if needs be.
798                let mut adjuster =
799                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
800                adjuster.run(&mut txn.items, &mut txn.meta);
801            }
802
803            txn.commit();
804            return;
805        }
806
807        // Look for the local event by the transaction ID or event ID.
808        let result = rfind_event_item(&txn.items, |it| {
809            it.transaction_id() == Some(txn_id)
810                || new_event_id.is_some()
811                    && it.event_id() == new_event_id
812                    && it.as_local().is_some()
813        });
814
815        let Some((idx, item)) = result else {
816            // Event wasn't found as a standalone item.
817            //
818            // If it was just sent, try to find if it matches a corresponding aggregation,
819            // and mark it as sent in that case.
820            if let Some(new_event_id) = new_event_id {
821                if txn.meta.aggregations.mark_aggregation_as_sent(
822                    txn_id.to_owned(),
823                    new_event_id.to_owned(),
824                    &mut txn.items,
825                    &txn.meta.room_version_rules,
826                ) {
827                    trace!("Aggregation marked as sent");
828                    txn.commit();
829                    return;
830                }
831
832                trace!("Sent aggregation was not found");
833            }
834
835            warn!("Timeline item not found, can't update send state");
836            return;
837        };
838
839        let Some(local_item) = item.as_local() else {
840            warn!("We looked for a local item, but it transitioned to remote.");
841            return;
842        };
843
844        // The event was already marked as sent, that's a broken state, let's
845        // emit an error but also override to the given sent state.
846        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
847            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
848        }
849
850        // If the event has just been marked as sent, update the aggregations mapping to
851        // take that into account.
852        if let Some(new_event_id) = new_event_id {
853            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
854        }
855
856        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
857        txn.items.replace(idx, new_item);
858
859        txn.commit();
860    }
861
862    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
863        let mut state = self.state.write().await;
864
865        if let Some((idx, _)) =
866            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
867        {
868            let mut txn = state.transaction();
869
870            txn.items.remove(idx);
871
872            // A read marker or a date divider may have been inserted before the local echo.
873            // Ensure both are up to date.
874            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
875            adjuster.run(&mut txn.items, &mut txn.meta);
876
877            txn.meta.update_read_marker(&mut txn.items);
878
879            txn.commit();
880
881            debug!("discarded local echo");
882            return true;
883        }
884
885        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
886        // dereferencing it once.
887        let mut txn = state.transaction();
888
889        // Look if this was a local aggregation.
890        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
891            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
892            &mut txn.items,
893        ) {
894            Ok(val) => val,
895            Err(err) => {
896                warn!("error when discarding local echo for an aggregation: {err}");
897                // The aggregation has been found, it's just that we couldn't discard it.
898                true
899            }
900        };
901
902        if found_aggregation {
903            txn.commit();
904        }
905
906        found_aggregation
907    }
908
909    pub(super) async fn replace_local_echo(
910        &self,
911        txn_id: &TransactionId,
912        content: AnyMessageLikeEventContent,
913    ) -> bool {
914        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
915            // Ideally, we'd support replacing local echoes for a reaction, etc., but
916            // handling RoomMessage should be sufficient in most cases. Worst
917            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
918            // editing the event then.
919            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
920            return false;
921        };
922
923        let mut state = self.state.write().await;
924        let mut txn = state.transaction();
925
926        let Some((idx, prev_item)) =
927            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
928        else {
929            debug!("Can't find local echo to replace");
930            return false;
931        };
932
933        // Reuse the previous local echo's state, but reset the send state to not sent
934        // (per API contract).
935        let ti_kind = {
936            let Some(prev_local_item) = prev_item.as_local() else {
937                warn!("We looked for a local item, but it transitioned as remote??");
938                return false;
939            };
940            // If the local echo had an upload progress, retain it.
941            let progress = as_variant!(&prev_local_item.send_state,
942                EventSendState::NotSentYet { progress } => progress.clone())
943            .flatten();
944            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
945        };
946
947        // Replace the local-related state (kind) and the content state.
948        let new_item = TimelineItem::new(
949            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
950                content.msgtype,
951                content.mentions,
952                prev_item.content().reactions().cloned().unwrap_or_default(),
953                prev_item.content().thread_root(),
954                prev_item.content().in_reply_to(),
955                prev_item.content().thread_summary(),
956            )),
957            prev_item.internal_id.to_owned(),
958        );
959
960        txn.items.replace(idx, new_item);
961
962        // This doesn't change the original sending time, so there's no need to adjust
963        // date dividers.
964
965        txn.commit();
966
967        debug!("Replaced local echo");
968        true
969    }
970
971    pub(super) async fn compute_redecryption_candidates(
972        &self,
973    ) -> (BTreeSet<String>, BTreeSet<String>) {
974        let state = self.state.read().await;
975        compute_redecryption_candidates(&state.items)
976    }
977
978    pub(super) async fn set_sender_profiles_pending(&self) {
979        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
980    }
981
982    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
983        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
984    }
985
986    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
987        self.state.write().await.items.for_each(|mut entry| {
988            let Some(event_item) = entry.as_event() else { return };
989            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
990                let new_item = entry.with_kind(TimelineItemKind::Event(
991                    event_item.with_sender_profile(profile_state.clone()),
992                ));
993                ObservableItemsEntry::replace(&mut entry, new_item);
994            }
995        });
996    }
997
998    pub(super) async fn update_missing_sender_profiles(&self) {
999        trace!("Updating missing sender profiles");
1000
1001        let mut state = self.state.write().await;
1002        let mut entries = state.items.entries();
1003        while let Some(mut entry) = entries.next() {
1004            let Some(event_item) = entry.as_event() else { continue };
1005            let event_id = event_item.event_id().map(debug);
1006            let transaction_id = event_item.transaction_id().map(debug);
1007
1008            if event_item.sender_profile().is_ready() {
1009                trace!(event_id, transaction_id, "Profile already set");
1010                continue;
1011            }
1012
1013            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1014                Some(profile) => {
1015                    trace!(event_id, transaction_id, "Adding profile");
1016                    let updated_item =
1017                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1018                    let new_item = entry.with_kind(updated_item);
1019                    ObservableItemsEntry::replace(&mut entry, new_item);
1020                }
1021                None => {
1022                    if !event_item.sender_profile().is_unavailable() {
1023                        trace!(event_id, transaction_id, "Marking profile unavailable");
1024                        let updated_item =
1025                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1026                        let new_item = entry.with_kind(updated_item);
1027                        ObservableItemsEntry::replace(&mut entry, new_item);
1028                    } else {
1029                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1030                    }
1031                }
1032            }
1033        }
1034
1035        trace!("Done updating missing sender profiles");
1036    }
1037
1038    /// Update the profiles of the given senders, even if they are ready.
1039    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1040        trace!("Forcing update of sender profiles: {sender_ids:?}");
1041
1042        let mut state = self.state.write().await;
1043        let mut entries = state.items.entries();
1044        while let Some(mut entry) = entries.next() {
1045            let Some(event_item) = entry.as_event() else { continue };
1046            if !sender_ids.contains(event_item.sender()) {
1047                continue;
1048            }
1049
1050            let event_id = event_item.event_id().map(debug);
1051            let transaction_id = event_item.transaction_id().map(debug);
1052
1053            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1054                Some(profile) => {
1055                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1056                    {
1057                        debug!(event_id, transaction_id, "Profile already up-to-date");
1058                    } else {
1059                        trace!(event_id, transaction_id, "Updating profile");
1060                        let updated_item =
1061                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1062                        let new_item = entry.with_kind(updated_item);
1063                        ObservableItemsEntry::replace(&mut entry, new_item);
1064                    }
1065                }
1066                None => {
1067                    if !event_item.sender_profile().is_unavailable() {
1068                        trace!(event_id, transaction_id, "Marking profile unavailable");
1069                        let updated_item =
1070                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1071                        let new_item = entry.with_kind(updated_item);
1072                        ObservableItemsEntry::replace(&mut entry, new_item);
1073                    } else {
1074                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1075                    }
1076                }
1077            }
1078        }
1079
1080        trace!("Done forcing update of sender profiles");
1081    }
1082
1083    #[cfg(test)]
1084    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1085        let own_user_id = self.room_data_provider.own_user_id();
1086        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1087    }
1088
1089    /// Get the latest read receipt for the given user.
1090    ///
1091    /// Useful to get the latest read receipt, whether it's private or public.
1092    pub(super) async fn latest_user_read_receipt(
1093        &self,
1094        user_id: &UserId,
1095    ) -> Option<(OwnedEventId, Receipt)> {
1096        let receipt_thread = self.focus.receipt_thread();
1097
1098        self.state
1099            .read()
1100            .await
1101            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1102            .await
1103    }
1104
1105    /// Get the ID of the timeline event with the latest read receipt for the
1106    /// given user.
1107    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1108        &self,
1109        user_id: &UserId,
1110    ) -> Option<OwnedEventId> {
1111        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1112    }
1113
1114    /// Subscribe to changes in the read receipts of our own user.
1115    pub async fn subscribe_own_user_read_receipts_changed(
1116        &self,
1117    ) -> impl Stream<Item = ()> + use<P> {
1118        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1119    }
1120
1121    /// Handle a room send update that's a new local echo.
1122    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1123        match echo.content {
1124            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1125                let content = match serialized_event.deserialize() {
1126                    Ok(d) => d,
1127                    Err(err) => {
1128                        warn!("error deserializing local echo: {err}");
1129                        return;
1130                    }
1131                };
1132
1133                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1134                    .await;
1135
1136                if let Some(send_error) = send_error {
1137                    self.update_event_send_state(
1138                        &echo.transaction_id,
1139                        EventSendState::SendingFailed {
1140                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1141                                send_error,
1142                            ))),
1143                            is_recoverable: false,
1144                        },
1145                    )
1146                    .await;
1147                }
1148            }
1149
1150            LocalEchoContent::React { key, send_handle, applies_to } => {
1151                self.handle_local_reaction(key, send_handle, applies_to).await;
1152            }
1153
1154            LocalEchoContent::Redaction { redacts, send_error, .. } => {
1155                self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1156
1157                if let Some(send_error) = send_error {
1158                    self.update_event_send_state(
1159                        &echo.transaction_id,
1160                        EventSendState::SendingFailed {
1161                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1162                                send_error,
1163                            ))),
1164                            is_recoverable: false,
1165                        },
1166                    )
1167                    .await;
1168                }
1169            }
1170        }
1171    }
1172
1173    /// Adds a reaction (local echo) to a local echo.
1174    #[instrument(skip(self, send_handle))]
1175    async fn handle_local_reaction(
1176        &self,
1177        reaction_key: String,
1178        send_handle: SendReactionHandle,
1179        applies_to: OwnedTransactionId,
1180    ) {
1181        let mut state = self.state.write().await;
1182        let mut tr = state.transaction();
1183
1184        let target = TimelineEventItemId::TransactionId(applies_to);
1185
1186        let reaction_txn_id = send_handle.transaction_id().to_owned();
1187        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1188        let aggregation = Aggregation::new(
1189            TimelineEventItemId::TransactionId(reaction_txn_id),
1190            AggregationKind::Reaction {
1191                key: reaction_key.clone(),
1192                sender: self.room_data_provider.own_user_id().to_owned(),
1193                timestamp: MilliSecondsSinceUnixEpoch::now(),
1194                reaction_status,
1195            },
1196        );
1197
1198        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1199        find_item_and_apply_aggregation(
1200            &tr.meta.aggregations,
1201            &mut tr.items,
1202            &target,
1203            aggregation,
1204            &tr.meta.room_version_rules,
1205        );
1206
1207        tr.commit();
1208    }
1209
1210    /// Applies a local echo of a redaction.
1211    pub(super) async fn handle_local_redaction(
1212        &self,
1213        txn_id: OwnedTransactionId,
1214        redacts: OwnedEventId,
1215    ) {
1216        let mut state = self.state.write().await;
1217        let mut tr = state.transaction();
1218
1219        let target = TimelineEventItemId::EventId(redacts);
1220
1221        let aggregation = Aggregation::new(
1222            TimelineEventItemId::TransactionId(txn_id),
1223            AggregationKind::Redaction { is_local: true },
1224        );
1225
1226        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1227        find_item_and_apply_aggregation(
1228            &tr.meta.aggregations,
1229            &mut tr.items,
1230            &target,
1231            aggregation,
1232            &tr.meta.room_version_rules,
1233        );
1234
1235        tr.commit();
1236    }
1237
1238    /// Handle a single room send queue update.
1239    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1240        match update {
1241            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1242                self.handle_local_echo(echo).await;
1243            }
1244
1245            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1246                if !self.discard_local_echo(&transaction_id).await {
1247                    warn!("couldn't find the local echo to discard");
1248                }
1249            }
1250
1251            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1252                let content = match new_content.deserialize() {
1253                    Ok(d) => d,
1254                    Err(err) => {
1255                        warn!("error deserializing local echo (upon edit): {err}");
1256                        return;
1257                    }
1258                };
1259
1260                if !self.replace_local_echo(&transaction_id, content).await {
1261                    warn!("couldn't find the local echo to replace");
1262                }
1263            }
1264
1265            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1266                self.update_event_send_state(
1267                    &transaction_id,
1268                    EventSendState::SendingFailed { error, is_recoverable },
1269                )
1270                .await;
1271            }
1272
1273            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1274                self.update_event_send_state(
1275                    &transaction_id,
1276                    EventSendState::NotSentYet { progress: None },
1277                )
1278                .await;
1279            }
1280
1281            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1282                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1283                    .await;
1284            }
1285
1286            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1287                self.update_event_send_state(
1288                    &related_to,
1289                    EventSendState::NotSentYet {
1290                        progress: Some(MediaUploadProgress { index, progress }),
1291                    },
1292                )
1293                .await;
1294            }
1295        }
1296    }
1297
1298    /// Insert a timeline start item at the beginning of the room, if it's
1299    /// missing.
1300    pub async fn insert_timeline_start_if_missing(&self) {
1301        let mut state = self.state.write().await;
1302        let mut txn = state.transaction();
1303        txn.items.push_timeline_start_if_missing(
1304            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1305        );
1306        txn.commit();
1307    }
1308
1309    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1310    /// timeline or not.
1311    ///
1312    /// Can be `None` if the event cannot be represented as a standalone item,
1313    /// because it's an aggregation.
1314    pub(super) async fn make_replied_to(
1315        &self,
1316        event: TimelineEvent,
1317    ) -> Result<Option<EmbeddedEvent>, Error> {
1318        let state = self.state.read().await;
1319        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1320    }
1321}
1322
1323impl TimelineController {
1324    pub(super) fn room(&self) -> &Room {
1325        &self.room_data_provider
1326    }
1327
1328    /// Initializes the configured timeline focus with appropriate data.
1329    ///
1330    /// Should be called only once after creation of the [`TimelineController`],
1331    /// with all its fields set.
1332    pub(super) async fn init_focus(
1333        &self,
1334        focus: &TimelineFocus,
1335        room_event_cache: &RoomEventCache,
1336    ) -> Result<InitFocusResult, Error> {
1337        match focus {
1338            TimelineFocus::Live { .. } => {
1339                // Retrieve the cached events, and add them to the timeline.
1340                let events = room_event_cache.events().await?;
1341
1342                let has_events = !events.is_empty();
1343
1344                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1345
1346                match room_event_cache.pagination().status().get() {
1347                    PaginationStatus::Idle { hit_timeline_start } => {
1348                        if hit_timeline_start {
1349                            // Eagerly insert the timeline start item, since pagination claims
1350                            // we've already hit the timeline start.
1351                            self.insert_timeline_start_if_missing().await;
1352                        }
1353                    }
1354                    PaginationStatus::Paginating => {}
1355                }
1356
1357                Ok(InitFocusResult { has_events, focus_task: None })
1358            }
1359
1360            TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
1361                // Use the event-focused cache from the event cache layer.
1362                let event_cache_thread_mode = match thread_mode {
1363                    TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
1364                    TimelineEventFocusThreadMode::Automatic { .. } => {
1365                        EventFocusThreadMode::Automatic
1366                    }
1367                };
1368
1369                let cache = room_event_cache
1370                    .get_or_create_event_focused_cache(
1371                        event_id.clone(),
1372                        *num_context_events,
1373                        event_cache_thread_mode,
1374                    )
1375                    .await
1376                    .map_err(PaginationError::EventCache)?;
1377
1378                let (events, receiver) = cache.subscribe().await;
1379
1380                let has_events = !events.is_empty();
1381
1382                // Ask the cache for the thread root, if it managed to extract one or decided
1383                // that the target event was the thread root.
1384                match &*self.focus {
1385                    TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
1386                        if let Some(thread_root) = cache.thread_root().await {
1387                            focus_thread_root.get_or_init(|| thread_root);
1388                        }
1389                    }
1390                    TimelineFocusKind::Live { .. }
1391                    | TimelineFocusKind::Thread { .. }
1392                    | TimelineFocusKind::PinnedEvents => {
1393                        panic!("unexpected focus for an event-focused timeline")
1394                    }
1395                }
1396
1397                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1398                    .await;
1399
1400                let task = self
1401                    .room_data_provider
1402                    .client()
1403                    .task_monitor()
1404                    .spawn_infinite_task(
1405                        "timeline::event_focused_cache_updates",
1406                        event_focused_task(
1407                            event_id.clone(),
1408                            (*thread_mode).into(),
1409                            room_event_cache.clone(),
1410                            self.clone(),
1411                            receiver,
1412                        ),
1413                    )
1414                    .abort_on_drop();
1415
1416                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1417            }
1418
1419            TimelineFocus::Thread { root_event_id, .. } => {
1420                let (has_events, receiver) =
1421                    self.init_with_thread_root(root_event_id, room_event_cache).await?;
1422
1423                let room = &self.room_data_provider;
1424                let span = info_span!(
1425                    parent: Span::none(),
1426                    "thread_live_update_handler",
1427                    room_id = ?room.room_id(),
1428                );
1429                span.follows_from(Span::current());
1430
1431                let task = room
1432                    .client()
1433                    .task_monitor()
1434                    .spawn_infinite_task(
1435                        "timeline::thread_event_cache_updates",
1436                        thread_updates_task(
1437                            receiver,
1438                            room_event_cache.clone(),
1439                            self.clone(),
1440                            root_event_id.clone(),
1441                        )
1442                        .instrument(span),
1443                    )
1444                    .abort_on_drop();
1445
1446                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1447            }
1448
1449            TimelineFocus::PinnedEvents => {
1450                let (initial_events, pinned_events_recv) =
1451                    room_event_cache.subscribe_to_pinned_events().await?;
1452
1453                let has_events = !initial_events.is_empty();
1454
1455                self.replace_with_initial_remote_events(
1456                    initial_events,
1457                    RemoteEventOrigin::Pagination,
1458                )
1459                .await;
1460
1461                let task = self
1462                    .room_data_provider
1463                    .client()
1464                    .task_monitor()
1465                    .spawn_infinite_task(
1466                        "timeline::pinned_event_cache_updates",
1467                        pinned_events_task(
1468                            room_event_cache.clone(),
1469                            self.clone(),
1470                            pinned_events_recv,
1471                        ),
1472                    )
1473                    .abort_on_drop();
1474
1475                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1476            }
1477        }
1478    }
1479
1480    /// (Re-)initialise a timeline using [`TimelineFocus::Thread`] with cached
1481    /// threaded events and secondary relations.
1482    ///
1483    /// Returns whether there were any events added to the timeline, and a
1484    /// receiver to return updates after the initial events have been
1485    /// inserted in the timeline.
1486    pub(super) async fn init_with_thread_root(
1487        &self,
1488        root_event_id: &OwnedEventId,
1489        room_event_cache: &RoomEventCache,
1490    ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1491        let (events, receiver) =
1492            room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
1493        let has_events = !events.is_empty();
1494
1495        // For each event, we also need to find the related events, as they don't
1496        // include the thread relationship, they won't be included in
1497        // the initial list of events.
1498        let mut related_events = Vector::new();
1499        for event_id in events.iter().filter_map(|event| event.event_id()) {
1500            if let Some((_original, related)) =
1501                room_event_cache.find_event_with_relations(&event_id, None).await?
1502            {
1503                related_events.extend(related);
1504            }
1505        }
1506
1507        self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1508
1509        // Now that we've inserted the thread events, add the aggregations too.
1510        if !related_events.is_empty() {
1511            self.handle_remote_aggregations(
1512                vec![VectorDiff::Append { values: related_events }],
1513                RemoteEventOrigin::Cache,
1514            )
1515            .await;
1516        }
1517
1518        Ok((has_events, receiver))
1519    }
1520
1521    /// Given an event identifier, will fetch the details for the event it's
1522    /// replying to, if applicable.
1523    #[instrument(skip(self))]
1524    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1525        let state_guard = self.state.write().await;
1526        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1527            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1528        let remote_item = item
1529            .as_remote()
1530            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1531            .clone();
1532
1533        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1534            debug!("Event is not a message");
1535            return Ok(());
1536        };
1537        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1538            debug!("Event is not a reply");
1539            return Ok(());
1540        };
1541        if let TimelineDetails::Pending = &in_reply_to.event {
1542            debug!("Replied-to event is already being fetched");
1543            return Ok(());
1544        }
1545        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1546            debug!("Replied-to event has already been fetched");
1547            return Ok(());
1548        }
1549
1550        let internal_id = item.internal_id.to_owned();
1551        let item = item.clone();
1552        let event = fetch_replied_to_event(
1553            state_guard,
1554            &self.state,
1555            index,
1556            &item,
1557            internal_id,
1558            &msglike,
1559            &in_reply_to.event_id,
1560            self.room(),
1561        )
1562        .await?;
1563
1564        // We need to be sure to have the latest position of the event as it might have
1565        // changed while waiting for the request.
1566        let mut state = self.state.write().await;
1567        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1568            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1569
1570        // Check the state of the event again, it might have been redacted while
1571        // the request was in-flight.
1572        let TimelineItemContent::MsgLike(MsgLikeContent {
1573            kind: MsgLikeKind::Message(message),
1574            reactions,
1575            thread_root,
1576            in_reply_to,
1577            thread_summary,
1578        }) = item.content().clone()
1579        else {
1580            info!("Event is no longer a message (redacted?)");
1581            return Ok(());
1582        };
1583        let Some(in_reply_to) = in_reply_to else {
1584            warn!("Event no longer has a reply (bug?)");
1585            return Ok(());
1586        };
1587
1588        // Now that we've received the content of the replied-to event, replace the
1589        // replied-to content in the item with it.
1590        trace!("Updating in-reply-to details");
1591        let internal_id = item.internal_id.to_owned();
1592        let mut item = item.clone();
1593        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1594            kind: MsgLikeKind::Message(message),
1595            reactions,
1596            thread_root,
1597            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1598            thread_summary,
1599        }));
1600        state.items.replace(index, TimelineItem::new(item, internal_id));
1601
1602        Ok(())
1603    }
1604
1605    /// Returns the thread that should be used for a read receipt based on the
1606    /// current focus of the timeline and the receipt type.
1607    ///
1608    /// A `SendReceiptType::FullyRead` will always use
1609    /// `ReceiptThread::Unthreaded`
1610    pub(super) fn infer_thread_for_read_receipt(
1611        &self,
1612        receipt_type: &SendReceiptType,
1613    ) -> ReceiptThread {
1614        if matches!(receipt_type, SendReceiptType::FullyRead) {
1615            ReceiptThread::Unthreaded
1616        } else {
1617            self.focus.receipt_thread()
1618        }
1619    }
1620
1621    /// Check whether the given receipt should be sent.
1622    ///
1623    /// Returns `false` if the given receipt is older than the current one.
1624    pub(super) async fn should_send_receipt(
1625        &self,
1626        receipt_type: &SendReceiptType,
1627        receipt_thread: &ReceiptThread,
1628        event_id: &EventId,
1629    ) -> bool {
1630        let own_user_id = self.room().own_user_id();
1631        let state = self.state.read().await;
1632        let room = self.room();
1633
1634        match receipt_type {
1635            SendReceiptType::Read => {
1636                if let Some((old_pub_read, _)) = state
1637                    .meta
1638                    .user_receipt(
1639                        own_user_id,
1640                        ReceiptType::Read,
1641                        receipt_thread.clone(),
1642                        room,
1643                        state.items.all_remote_events(),
1644                    )
1645                    .await
1646                {
1647                    trace!(%old_pub_read, "found a previous public receipt");
1648                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1649                        &old_pub_read,
1650                        event_id,
1651                        state.items.all_remote_events(),
1652                    ) {
1653                        trace!(
1654                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1655                        );
1656                        return relative_pos == RelativePosition::After;
1657                    }
1658                }
1659            }
1660
1661            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1662            // doesn't make sense to have a private read receipt behind a public one.
1663            SendReceiptType::ReadPrivate => {
1664                if let Some((old_priv_read, _)) =
1665                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1666                {
1667                    trace!(%old_priv_read, "found a previous private receipt");
1668                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1669                        &old_priv_read,
1670                        event_id,
1671                        state.items.all_remote_events(),
1672                    ) {
1673                        trace!(
1674                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1675                        );
1676                        return relative_pos == RelativePosition::After;
1677                    }
1678                }
1679            }
1680
1681            SendReceiptType::FullyRead => {
1682                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1683                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1684                        &prev_event_id,
1685                        event_id,
1686                        state.items.all_remote_events(),
1687                    )
1688                {
1689                    return relative_pos == RelativePosition::After;
1690                }
1691            }
1692
1693            _ => {}
1694        }
1695
1696        // Let the server handle unknown receipts.
1697        true
1698    }
1699
1700    /// Returns the latest event identifier, even if it's not visible, or if
1701    /// it's folded into another timeline item.
1702    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1703        let state = self.state.read().await;
1704        let filter_out_thread_events = match self.focus() {
1705            TimelineFocusKind::Thread { .. } => false,
1706            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1707            TimelineFocusKind::Event { .. } => {
1708                // For event-focused timelines, filtering is handled in the event cache layer.
1709                false
1710            }
1711            TimelineFocusKind::PinnedEvents => true,
1712        };
1713
1714        state
1715            .items
1716            .all_remote_events()
1717            .iter()
1718            .rev()
1719            .filter_map(|event_meta| {
1720                if !filter_out_thread_events {
1721                    // For an unthreaded timeline, the last event is always the latest event.
1722                    Some(event_meta.event_id.clone())
1723                } else if event_meta.thread_root_id.is_none() {
1724                    // For the main-thread timeline, only non-threaded events are valid candidates
1725                    // for the latest event.
1726                    //
1727                    // But! An event could be an aggregation that relate to an in-thread
1728                    // event. In this case, it's not a valid latest event.
1729                    if let Some(TimelineEventItemId::EventId(target_event_id)) =
1730                        state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1731                            event_meta.event_id.clone(),
1732                        ))
1733                        && let Some(target_meta) =
1734                            state.items.all_remote_events().get_by_event_id(target_event_id)
1735                        && target_meta.thread_root_id.is_some()
1736                    {
1737                        // This event is an aggregation of an in-thread event, so skip it.
1738                        None
1739                    } else {
1740                        // Not in a thread, and not the aggregation of an in-thread event, so it's
1741                        // a valid candidate for the latest event.
1742                        Some(event_meta.event_id.clone())
1743                    }
1744                } else {
1745                    // An in-thread event, when we're filtering out threaded events, is never a
1746                    // valid candidate for the latest event.
1747                    None
1748                }
1749            })
1750            .next()
1751    }
1752
1753    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1754    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1755        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1756
1757        let request = DecryptionRetryRequest {
1758            room_id: self.room().room_id().to_owned(),
1759            utd_session_ids: utds,
1760            refresh_info_session_ids: decrypted,
1761        };
1762
1763        self.room().client().event_cache().request_decryption(request);
1764    }
1765
1766    /// Combine the global (event cache) pagination status with the local state
1767    /// of the timeline.
1768    ///
1769    /// This only changes the global pagination status of this room, in one
1770    /// case: if the timeline has a skip count greater than 0, it will
1771    /// ensure that the pagination status says that we haven't reached the
1772    /// timeline start yet.
1773    pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1774        match status {
1775            PaginationStatus::Idle { hit_timeline_start } => {
1776                if hit_timeline_start {
1777                    let state = self.state.read().await;
1778                    // If the skip count is greater than 0, it means that a subsequent pagination
1779                    // could return more items, so pretend we didn't get the information that the
1780                    // timeline start was hit.
1781                    if state.meta.subscriber_skip_count.get() > 0 {
1782                        return PaginationStatus::Idle { hit_timeline_start: false };
1783                    }
1784                }
1785            }
1786            PaginationStatus::Paginating => {}
1787        }
1788
1789        // You're perfect, just the way you are.
1790        status
1791    }
1792}
1793
1794impl<P: RoomDataProvider> TimelineController<P> {
1795    /// Returns the timeline focus of the [`TimelineController`].
1796    pub(super) fn focus(&self) -> &TimelineFocusKind {
1797        &self.focus
1798    }
1799}
1800
1801#[allow(clippy::too_many_arguments)]
1802async fn fetch_replied_to_event<P: RoomDataProvider>(
1803    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1804    state_lock: &RwLock<TimelineState<P>>,
1805    index: usize,
1806    item: &EventTimelineItem,
1807    internal_id: TimelineUniqueId,
1808    msglike: &MsgLikeContent,
1809    in_reply_to: &EventId,
1810    room: &Room,
1811) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1812    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1813        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1814        trace!("Found replied-to event locally");
1815        return Ok(details);
1816    }
1817
1818    // Replace the item with a new timeline item that has the fetching status of the
1819    // replied-to event to pending.
1820    trace!("Setting in-reply-to details to pending");
1821    let in_reply_to_details =
1822        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1823
1824    let event_item = item
1825        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1826
1827    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1828    state_guard.items.replace(index, new_timeline_item);
1829
1830    // Don't hold the state lock while the network request is made.
1831    drop(state_guard);
1832
1833    trace!("Fetching replied-to event");
1834    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1835        Ok(timeline_event) => {
1836            let state = state_lock.read().await;
1837
1838            let replied_to_item =
1839                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1840
1841            if let Some(item) = replied_to_item {
1842                TimelineDetails::Ready(Box::new(item))
1843            } else {
1844                // The replied-to item is an aggregation, not a standalone item.
1845                return Err(Error::UnsupportedEvent);
1846            }
1847        }
1848
1849        Err(e) => TimelineDetails::Error(Arc::new(e)),
1850    };
1851
1852    Ok(res)
1853}