matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25    deserialized_responses::TimelineEvent,
26    event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
27    paginators::{PaginationResult, PaginationToken, Paginator},
28    send_queue::{
29        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
30    },
31};
32#[cfg(test)]
33use ruma::events::receipt::ReceiptEventContent;
34use ruma::{
35    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
36    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37    events::{
38        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
39        AnySyncTimelineEvent, MessageLikeEventType,
40        poll::unstable_start::UnstablePollStartEventContent,
41        reaction::ReactionEventContent,
42        receipt::{Receipt, ReceiptThread, ReceiptType},
43        relation::Annotation,
44        room::message::{MessageType, Relation},
45    },
46    room_version_rules::RoomVersionRules,
47    serde::Raw,
48};
49use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
50use tracing::{debug, error, field::debug, info, instrument, trace, warn};
51
52pub(super) use self::{
53    metadata::{RelativePosition, TimelineMetadata},
54    observable_items::{
55        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
56        ObservableItemsTransactionEntry,
57    },
58    state::TimelineState,
59    state_transaction::TimelineStateTransaction,
60};
61use super::{
62    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
63    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
64    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
65    TimelineReadReceiptTracking, VirtualTimelineItem,
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_item::{ReactionStatus, RemoteEventOrigin},
68    item::TimelineUniqueId,
69    subscriber::TimelineSubscriber,
70    traits::RoomDataProvider,
71};
72use crate::{
73    timeline::{
74        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75        algorithms::rfind_event_by_item_id,
76        controller::decryption_retry_task::compute_redecryption_candidates,
77        date_dividers::DateDividerAdjuster,
78        event_item::TimelineItemHandle,
79        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
80    },
81    unable_to_decrypt_hook::UtdHookManager,
82};
83
84pub(in crate::timeline) mod aggregations;
85mod decryption_retry_task;
86mod metadata;
87mod observable_items;
88mod read_receipts;
89mod state;
90mod state_transaction;
91
92pub(super) use aggregations::*;
93pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
94use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
95use matrix_sdk_common::serde_helpers::extract_thread_root;
96
97/// Data associated to the current timeline focus.
98///
99/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
100/// version of it, including extra state that makes it useful over the lifetime
101/// of a timeline.
102#[derive(Debug)]
103pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
104    /// The timeline receives live events from the sync.
105    Live {
106        /// Whether to hide in-thread events from the timeline.
107        hide_threaded_events: bool,
108    },
109
110    /// The timeline is focused on a single event, and it can expand in one
111    /// direction or another.
112    Event {
113        /// The paginator instance.
114        paginator: OnceCell<AnyPaginator<P>>,
115    },
116
117    /// A live timeline for a thread.
118    Thread {
119        /// The root event for the current thread.
120        root_event_id: OwnedEventId,
121    },
122
123    PinnedEvents {
124        loader: PinnedEventsLoader,
125    },
126}
127
128#[derive(Debug)]
129pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
130    Unthreaded {
131        /// The actual event paginator.
132        paginator: Paginator<P>,
133        /// Whether to hide in-thread events from the timeline.
134        hide_threaded_events: bool,
135    },
136    Threaded(ThreadedEventsLoader<P>),
137}
138
139impl<P: RoomDataProvider> AnyPaginator<P> {
140    /// Runs a backward pagination (requesting `num_events` to the server), from
141    /// the current state of the object.
142    ///
143    /// Will return immediately if we have already hit the start of the
144    /// timeline.
145    ///
146    /// May return an error if it's already paginating, or if the call to
147    /// the homeserver endpoints failed.
148    pub async fn paginate_backwards(
149        &self,
150        num_events: u16,
151    ) -> Result<PaginationResult, PaginatorError> {
152        match self {
153            Self::Unthreaded { paginator, .. } => {
154                paginator.paginate_backward(num_events.into()).await
155            }
156            Self::Threaded(threaded_paginator) => {
157                threaded_paginator.paginate_backwards(num_events.into()).await
158            }
159        }
160    }
161
162    /// Runs a forward pagination (requesting `num_events` to the server), from
163    /// the current state of the object.
164    ///
165    /// Will return immediately if we have already hit the end of the timeline.
166    ///
167    /// May return an error if it's already paginating, or if the call to
168    /// the homeserver endpoints failed.
169    pub async fn paginate_forwards(
170        &self,
171        num_events: u16,
172    ) -> Result<PaginationResult, PaginatorError> {
173        match self {
174            Self::Unthreaded { paginator, .. } => {
175                paginator.paginate_forward(num_events.into()).await
176            }
177            Self::Threaded(threaded_paginator) => {
178                threaded_paginator.paginate_forwards(num_events.into()).await
179            }
180        }
181    }
182
183    /// Whether to hide in-thread events from the timeline.
184    pub fn hide_threaded_events(&self) -> bool {
185        match self {
186            Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
187            Self::Threaded(_) => false,
188        }
189    }
190
191    /// Returns the root event id of the thread, if the paginator is
192    /// [`AnyPaginator::Threaded`].
193    pub fn thread_root(&self) -> Option<&EventId> {
194        match self {
195            Self::Unthreaded { .. } => None,
196            Self::Threaded(thread_events_loader) => {
197                Some(thread_events_loader.thread_root_event_id())
198            }
199        }
200    }
201}
202
203impl<P: RoomDataProvider> TimelineFocusKind<P> {
204    /// Returns the [`ReceiptThread`] that should be used for the current
205    /// timeline focus.
206    ///
207    /// Live and event timelines will use the unthreaded read receipt type in
208    /// general, unless they hide in-thread events, in which case they will
209    /// use the main thread.
210    pub(super) fn receipt_thread(&self) -> ReceiptThread {
211        if let Some(thread_root) = self.thread_root() {
212            ReceiptThread::Thread(thread_root.to_owned())
213        } else if self.hide_threaded_events() {
214            ReceiptThread::Main
215        } else {
216            ReceiptThread::Unthreaded
217        }
218    }
219
220    /// Whether to hide in-thread events from the timeline.
221    fn hide_threaded_events(&self) -> bool {
222        match self {
223            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
224            TimelineFocusKind::Event { paginator } => {
225                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
226            }
227            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
228        }
229    }
230
231    /// Whether the focus is on a thread (from a live thread or a thread
232    /// permalink).
233    fn is_thread(&self) -> bool {
234        self.thread_root().is_some()
235    }
236
237    /// If the focus is a thread, returns its root event ID.
238    fn thread_root(&self) -> Option<&EventId> {
239        match self {
240            TimelineFocusKind::Event { paginator, .. } => {
241                paginator.get().and_then(|paginator| paginator.thread_root())
242            }
243            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
244            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
245        }
246    }
247}
248
249#[derive(Clone, Debug)]
250pub(super) struct TimelineController<P: RoomDataProvider = Room> {
251    /// Inner mutable state.
252    state: Arc<RwLock<TimelineState<P>>>,
253
254    /// Focus data.
255    focus: Arc<TimelineFocusKind<P>>,
256
257    /// A [`RoomDataProvider`] implementation, providing data.
258    ///
259    /// The type is a `RoomDataProvider` to allow testing. In the real world,
260    /// this would normally be a [`Room`].
261    pub(crate) room_data_provider: P,
262
263    /// Settings applied to this timeline.
264    pub(super) settings: TimelineSettings,
265}
266
267#[derive(Clone)]
268pub(super) struct TimelineSettings {
269    /// Should the read receipts and read markers be handled and on which event
270    /// types?
271    pub(super) track_read_receipts: TimelineReadReceiptTracking,
272
273    /// Event filter that controls what's rendered as a timeline item (and thus
274    /// what can carry read receipts).
275    pub(super) event_filter: Arc<TimelineEventFilterFn>,
276
277    /// Are unparsable events added as timeline items of their own kind?
278    pub(super) add_failed_to_parse: bool,
279
280    /// Should the timeline items be grouped by day or month?
281    pub(super) date_divider_mode: DateDividerMode,
282}
283
284#[cfg(not(tarpaulin_include))]
285impl fmt::Debug for TimelineSettings {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        f.debug_struct("TimelineSettings")
288            .field("track_read_receipts", &self.track_read_receipts)
289            .field("add_failed_to_parse", &self.add_failed_to_parse)
290            .finish_non_exhaustive()
291    }
292}
293
294impl Default for TimelineSettings {
295    fn default() -> Self {
296        Self {
297            track_read_receipts: TimelineReadReceiptTracking::Disabled,
298            event_filter: Arc::new(default_event_filter),
299            add_failed_to_parse: true,
300            date_divider_mode: DateDividerMode::Daily,
301        }
302    }
303}
304
305/// The default event filter for
306/// [`crate::timeline::TimelineBuilder::event_filter`].
307///
308/// It filters out events that are not rendered by the timeline, including but
309/// not limited to: reactions, edits, redactions on existing messages.
310///
311/// If you have a custom filter, it may be best to chain yours with this one if
312/// you do not want to run into situations where a read receipt is not visible
313/// because it's living on an event that doesn't have a matching timeline item.
314pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
315    match event {
316        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
317            if ev.redacts(&rules.redaction).is_some() {
318                // This is a redaction of an existing message, we'll only update the previous
319                // message and not render a new entry.
320                false
321            } else {
322                // This is a redacted entry, that we'll show only if the redacted entity wasn't
323                // a reaction.
324                ev.event_type() != MessageLikeEventType::Reaction
325            }
326        }
327
328        AnySyncTimelineEvent::MessageLike(msg) => {
329            match msg.original_content() {
330                None => {
331                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
332                    // a reaction.
333                    msg.event_type() != MessageLikeEventType::Reaction
334                }
335
336                Some(original_content) => {
337                    match original_content {
338                        AnyMessageLikeEventContent::RoomMessage(content) => {
339                            if content
340                                .relates_to
341                                .as_ref()
342                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
343                            {
344                                // Edits aren't visible by default.
345                                return false;
346                            }
347
348                            match content.msgtype {
349                                MessageType::Audio(_)
350                                | MessageType::Emote(_)
351                                | MessageType::File(_)
352                                | MessageType::Image(_)
353                                | MessageType::Location(_)
354                                | MessageType::Notice(_)
355                                | MessageType::ServerNotice(_)
356                                | MessageType::Text(_)
357                                | MessageType::Video(_)
358                                | MessageType::VerificationRequest(_) => true,
359                                #[cfg(feature = "unstable-msc4274")]
360                                MessageType::Gallery(_) => true,
361                                _ => false,
362                            }
363                        }
364
365                        AnyMessageLikeEventContent::Sticker(_)
366                        | AnyMessageLikeEventContent::UnstablePollStart(
367                            UnstablePollStartEventContent::New(_),
368                        )
369                        | AnyMessageLikeEventContent::CallInvite(_)
370                        | AnyMessageLikeEventContent::RtcNotification(_)
371                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
372
373                        _ => false,
374                    }
375                }
376            }
377        }
378
379        AnySyncTimelineEvent::State(_) => {
380            // All the state events may get displayed by default.
381            true
382        }
383    }
384}
385
386impl<P: RoomDataProvider> TimelineController<P> {
387    pub(super) fn new(
388        room_data_provider: P,
389        focus: TimelineFocus,
390        internal_id_prefix: Option<String>,
391        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
392        is_room_encrypted: bool,
393        settings: TimelineSettings,
394    ) -> Self {
395        let focus = match focus {
396            TimelineFocus::Live { hide_threaded_events } => {
397                TimelineFocusKind::Live { hide_threaded_events }
398            }
399
400            TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
401
402            TimelineFocus::Thread { root_event_id, .. } => {
403                TimelineFocusKind::Thread { root_event_id }
404            }
405
406            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
407                TimelineFocusKind::PinnedEvents {
408                    loader: PinnedEventsLoader::new(
409                        Arc::new(room_data_provider.clone()),
410                        max_events_to_load as usize,
411                        max_concurrent_requests as usize,
412                    ),
413                }
414            }
415        };
416
417        let focus = Arc::new(focus);
418        let state = Arc::new(RwLock::new(TimelineState::new(
419            focus.clone(),
420            room_data_provider.own_user_id().to_owned(),
421            room_data_provider.room_version_rules(),
422            internal_id_prefix,
423            unable_to_decrypt_hook,
424            is_room_encrypted,
425        )));
426
427        Self { state, focus, room_data_provider, settings }
428    }
429
430    /// Initializes the configured focus with appropriate data.
431    ///
432    /// Should be called only once after creation of the [`TimelineInner`], with
433    /// all its fields set.
434    ///
435    /// Returns whether there were any events added to the timeline.
436    pub(super) async fn init_focus(
437        &self,
438        focus: &TimelineFocus,
439        room_event_cache: &RoomEventCache,
440    ) -> Result<bool, Error> {
441        match focus {
442            TimelineFocus::Live { .. } => {
443                // Retrieve the cached events, and add them to the timeline.
444                let events = room_event_cache.events().await?;
445
446                let has_events = !events.is_empty();
447
448                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
449
450                match room_event_cache.pagination().status().get() {
451                    RoomPaginationStatus::Idle { hit_timeline_start } => {
452                        if hit_timeline_start {
453                            // Eagerly insert the timeline start item, since pagination claims
454                            // we've already hit the timeline start.
455                            self.insert_timeline_start_if_missing().await;
456                        }
457                    }
458                    RoomPaginationStatus::Paginating => {}
459                }
460
461                Ok(has_events)
462            }
463
464            TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
465                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
466                    // Note: this is sync'd with code in the ctor.
467                    unreachable!();
468                };
469
470                let event_paginator = Paginator::new(self.room_data_provider.clone());
471
472                // Start a /context request so we can know if the event is in a thread or not,
473                // and know which kind of pagination we'll be using then.
474                let start_from_result = event_paginator
475                    .start_from(event_id, (*num_context_events).into())
476                    .await
477                    .map_err(PaginationError::Paginator)?;
478
479                // Find the target event, and see if it's part of a thread.
480                let thread_root_event_id = start_from_result
481                    .events
482                    .iter()
483                    .find(
484                        |event| {
485                            if let Some(id) = event.event_id() { id == *event_id } else { false }
486                        },
487                    )
488                    .and_then(|event| extract_thread_root(event.raw()));
489
490                let _ = paginator.set(match thread_root_event_id {
491                    Some(root_id) => {
492                        let mut tokens = event_paginator.tokens();
493
494                        // Look if the thread root event is part of the /context response. This
495                        // allows us to spare some backwards pagination with
496                        // /relations.
497                        let includes_root_event = start_from_result.events.iter().any(|event| {
498                            if let Some(id) = event.event_id() { id == root_id } else { false }
499                        });
500
501                        if includes_root_event {
502                            // If we have the root event, there's no need to do back-paginations
503                            // with /relations, since we are at the start of the thread.
504                            tokens.previous = PaginationToken::HitEnd;
505                        }
506
507                        AnyPaginator::Threaded(ThreadedEventsLoader::new(
508                            self.room_data_provider.clone(),
509                            root_id,
510                            tokens,
511                        ))
512                    }
513
514                    None => AnyPaginator::Unthreaded {
515                        paginator: event_paginator,
516                        hide_threaded_events: *hide_threaded_events,
517                    },
518                });
519
520                let has_events = !start_from_result.events.is_empty();
521                let events = start_from_result.events;
522
523                match paginator.get().expect("Paginator was not instantiated") {
524                    AnyPaginator::Unthreaded { .. } => {
525                        self.replace_with_initial_remote_events(
526                            events,
527                            RemoteEventOrigin::Pagination,
528                        )
529                        .await;
530                    }
531
532                    AnyPaginator::Threaded(threaded_events_loader) => {
533                        // We filter only events that are part of the thread (including the root),
534                        // since /context will return adjacent events without filters.
535                        let thread_root = threaded_events_loader.thread_root_event_id();
536                        let events_in_thread = events.into_iter().filter(|event| {
537                            extract_thread_root(event.raw())
538                                .is_some_and(|event_thread_root| event_thread_root == thread_root)
539                                || event.event_id().as_deref() == Some(thread_root)
540                        });
541
542                        self.replace_with_initial_remote_events(
543                            events_in_thread,
544                            RemoteEventOrigin::Pagination,
545                        )
546                        .await;
547                    }
548                }
549
550                Ok(has_events)
551            }
552
553            TimelineFocus::Thread { root_event_id, .. } => {
554                let (events, _) =
555                    room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
556                let has_events = !events.is_empty();
557
558                // For each event, we also need to find the related events, as they don't
559                // include the thread relationship, they won't be included in
560                // the initial list of events.
561                let mut related_events = Vector::new();
562                for event_id in events.iter().filter_map(|event| event.event_id()) {
563                    if let Some((_original, related)) =
564                        room_event_cache.find_event_with_relations(&event_id, None).await?
565                    {
566                        related_events.extend(related);
567                    }
568                }
569
570                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
571
572                // Now that we've inserted the thread events, add the aggregations too.
573                if !related_events.is_empty() {
574                    self.handle_remote_aggregations(
575                        vec![VectorDiff::Append { values: related_events }],
576                        RemoteEventOrigin::Cache,
577                    )
578                    .await;
579                }
580
581                Ok(has_events)
582            }
583
584            TimelineFocus::PinnedEvents { .. } => {
585                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
586                    // Note: this is sync'd with code in the ctor.
587                    unreachable!();
588                };
589
590                let Some(loaded_events) =
591                    loader.load_events().await.map_err(Error::PinnedEventsError)?
592                else {
593                    // There wasn't any events.
594                    return Ok(false);
595                };
596
597                let has_events = !loaded_events.is_empty();
598
599                self.replace_with_initial_remote_events(
600                    loaded_events,
601                    RemoteEventOrigin::Pagination,
602                )
603                .await;
604
605                Ok(has_events)
606            }
607        }
608    }
609
610    /// Listens to encryption state changes for the room in
611    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
612    /// existing timeline items. This will then cause a refresh of those
613    /// timeline items.
614    pub async fn handle_encryption_state_changes(&self) {
615        let mut room_info = self.room_data_provider.room_info();
616
617        // Small function helper to help mark as encrypted.
618        let mark_encrypted = || async {
619            let mut state = self.state.write().await;
620            state.meta.is_room_encrypted = true;
621            state.mark_all_events_as_encrypted();
622        };
623
624        if room_info.get().encryption_state().is_encrypted() {
625            // If the room was already encrypted, it won't toggle to unencrypted, so we can
626            // shut down this task early.
627            mark_encrypted().await;
628            return;
629        }
630
631        while let Some(info) = room_info.next().await {
632            if info.encryption_state().is_encrypted() {
633                mark_encrypted().await;
634                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
635                // here is done.
636                break;
637            }
638        }
639    }
640
641    pub(crate) async fn reload_pinned_events(
642        &self,
643    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
644        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
645            loader.load_events().await
646        } else {
647            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
648        }
649    }
650
651    /// Run a lazy backwards pagination (in live mode).
652    ///
653    /// It adjusts the `count` value of the `Skip` higher-order stream so that
654    /// more items are pushed front in the timeline.
655    ///
656    /// If no more items are available (i.e. if the `count` is zero), this
657    /// method returns `Some(needs)` where `needs` is the number of events that
658    /// must be unlazily backwards paginated.
659    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
660        let state = self.state.read().await;
661
662        let (count, needs) = state
663            .meta
664            .subscriber_skip_count
665            .compute_next_when_paginating_backwards(num_events.into());
666
667        // This always happens on a live timeline.
668        let is_live_timeline = true;
669        state.meta.subscriber_skip_count.update(count, is_live_timeline);
670
671        needs
672    }
673
674    /// Run a backwards pagination (in focused mode) and append the results to
675    /// the timeline.
676    ///
677    /// Returns whether we hit the start of the timeline.
678    pub(super) async fn focused_paginate_backwards(
679        &self,
680        num_events: u16,
681    ) -> Result<bool, PaginationError> {
682        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
683            TimelineFocusKind::Live { .. }
684            | TimelineFocusKind::PinnedEvents { .. }
685            | TimelineFocusKind::Thread { .. } => {
686                return Err(PaginationError::NotSupported);
687            }
688            TimelineFocusKind::Event { paginator, .. } => paginator
689                .get()
690                .expect("Paginator was not instantiated")
691                .paginate_backwards(num_events)
692                .await
693                .map_err(PaginationError::Paginator)?,
694        };
695
696        // Events are in reverse topological order.
697        // We can push front each event individually.
698        self.handle_remote_events_with_diffs(
699            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
700            RemoteEventOrigin::Pagination,
701        )
702        .await;
703
704        Ok(hit_end_of_timeline)
705    }
706
707    /// Run a forwards pagination (in focused mode) and append the results to
708    /// the timeline.
709    ///
710    /// Returns whether we hit the end of the timeline.
711    pub(super) async fn focused_paginate_forwards(
712        &self,
713        num_events: u16,
714    ) -> Result<bool, PaginationError> {
715        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
716            TimelineFocusKind::Live { .. }
717            | TimelineFocusKind::PinnedEvents { .. }
718            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
719
720            TimelineFocusKind::Event { paginator, .. } => paginator
721                .get()
722                .expect("Paginator was not instantiated")
723                .paginate_forwards(num_events)
724                .await
725                .map_err(PaginationError::Paginator)?,
726        };
727
728        // Events are in topological order.
729        // We can append all events with no transformation.
730        self.handle_remote_events_with_diffs(
731            vec![VectorDiff::Append { values: events.into() }],
732            RemoteEventOrigin::Pagination,
733        )
734        .await;
735
736        Ok(hit_end_of_timeline)
737    }
738
739    /// Is this timeline receiving events from sync (aka has a live focus)?
740    pub(super) fn is_live(&self) -> bool {
741        matches!(&*self.focus, TimelineFocusKind::Live { .. })
742    }
743
744    /// Is this timeline focused on a thread?
745    pub(super) fn is_threaded(&self) -> bool {
746        self.focus.is_thread()
747    }
748
749    /// The root of the current thread, for a live thread timeline or a
750    /// permalink to a thread message.
751    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
752        self.focus.thread_root().map(ToOwned::to_owned)
753    }
754
755    /// Get a copy of the current items in the list.
756    ///
757    /// Cheap because `im::Vector` is cheap to clone.
758    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
759        self.state.read().await.items.clone_items()
760    }
761
762    #[cfg(test)]
763    pub(super) async fn subscribe_raw(
764        &self,
765    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
766        self.state.read().await.items.subscribe().into_values_and_stream()
767    }
768
769    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
770        let state = self.state.read().await;
771
772        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
773    }
774
775    pub(super) async fn subscribe_filter_map<U, F>(
776        &self,
777        f: F,
778    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
779    where
780        U: Clone,
781        F: Fn(Arc<TimelineItem>) -> Option<U>,
782    {
783        self.state.read().await.items.subscribe().filter_map(f)
784    }
785
786    /// Toggle a reaction locally.
787    ///
788    /// Returns true if the reaction was added, false if it was removed.
789    #[instrument(skip_all)]
790    pub(super) async fn toggle_reaction_local(
791        &self,
792        item_id: &TimelineEventItemId,
793        key: &str,
794    ) -> Result<bool, Error> {
795        let mut state = self.state.write().await;
796
797        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
798            warn!("Timeline item not found, can't add reaction");
799            return Err(Error::FailedToToggleReaction);
800        };
801
802        let user_id = self.room_data_provider.own_user_id();
803        let prev_status = item
804            .content()
805            .reactions()
806            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
807
808        let Some(prev_status) = prev_status else {
809            // Adding the new reaction.
810            match item.handle() {
811                TimelineItemHandle::Local(send_handle) => {
812                    if send_handle
813                        .react(key.to_owned())
814                        .await
815                        .map_err(|err| Error::SendQueueError(err.into()))?
816                        .is_some()
817                    {
818                        trace!("adding a reaction to a local echo");
819                        return Ok(true);
820                    }
821
822                    warn!("couldn't toggle reaction for local echo");
823                    return Ok(false);
824                }
825
826                TimelineItemHandle::Remote(event_id) => {
827                    // Add a reaction through the room data provider.
828                    // No need to reflect the effect locally, since the local echo handling will
829                    // take care of it.
830                    trace!("adding a reaction to a remote echo");
831                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
832                    self.room_data_provider
833                        .send(ReactionEventContent::from(annotation).into())
834                        .await?;
835                    return Ok(true);
836                }
837            }
838        };
839
840        trace!("removing a previous reaction");
841        match prev_status {
842            ReactionStatus::LocalToLocal(send_reaction_handle) => {
843                if let Some(handle) = send_reaction_handle {
844                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
845                        // Impossible state: the reaction has moved from local to echo under our
846                        // feet, but the timeline was supposed to be locked!
847                        warn!("unexpectedly unable to abort sending of local reaction");
848                    }
849                } else {
850                    warn!("no send reaction handle (this should only happen in testing contexts)");
851                }
852            }
853
854            ReactionStatus::LocalToRemote(send_handle) => {
855                // No need to reflect the change ourselves, since handling the discard of the
856                // local echo will take care of it.
857                trace!("aborting send of the previous reaction that was a local echo");
858                if let Some(handle) = send_handle {
859                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
860                        // Impossible state: the reaction has moved from local to echo under our
861                        // feet, but the timeline was supposed to be locked!
862                        warn!("unexpectedly unable to abort sending of local reaction");
863                    }
864                } else {
865                    warn!("no send handle (this should only happen in testing contexts)");
866                }
867            }
868
869            ReactionStatus::RemoteToRemote(event_id) => {
870                // Assume the redaction will work; we'll re-add the reaction if it didn't.
871                let Some(annotated_event_id) =
872                    item.as_remote().map(|event_item| event_item.event_id.clone())
873                else {
874                    warn!("remote reaction to remote event, but the associated item isn't remote");
875                    return Ok(false);
876                };
877
878                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
879                let reaction_info = reactions.remove_reaction(user_id, key);
880
881                if reaction_info.is_some() {
882                    let new_item = item.with_reactions(reactions);
883                    state.items.replace(item_pos, new_item);
884                } else {
885                    warn!(
886                        "reaction is missing on the item, not removing it locally, \
887                         but sending redaction."
888                    );
889                }
890
891                // Release the lock before running the request.
892                drop(state);
893
894                trace!("sending redact for a previous reaction");
895                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
896                    if let Some(reaction_info) = reaction_info {
897                        debug!("sending redact failed, adding the reaction back to the list");
898
899                        let mut state = self.state.write().await;
900                        if let Some((item_pos, item)) =
901                            rfind_event_by_id(&state.items, &annotated_event_id)
902                        {
903                            // Re-add the reaction to the mapping.
904                            let mut reactions =
905                                item.content().reactions().cloned().unwrap_or_default();
906                            reactions
907                                .entry(key.to_owned())
908                                .or_default()
909                                .insert(user_id.to_owned(), reaction_info);
910                            let new_item = item.with_reactions(reactions);
911                            state.items.replace(item_pos, new_item);
912                        } else {
913                            warn!(
914                                "couldn't find item to re-add reaction anymore; \
915                                 maybe it's been redacted?"
916                            );
917                        }
918                    }
919
920                    return Err(err);
921                }
922            }
923        }
924
925        Ok(false)
926    }
927
928    /// Handle updates on events as [`VectorDiff`]s.
929    pub(super) async fn handle_remote_events_with_diffs(
930        &self,
931        diffs: Vec<VectorDiff<TimelineEvent>>,
932        origin: RemoteEventOrigin,
933    ) {
934        if diffs.is_empty() {
935            return;
936        }
937
938        let mut state = self.state.write().await;
939        state
940            .handle_remote_events_with_diffs(
941                diffs,
942                origin,
943                &self.room_data_provider,
944                &self.settings,
945            )
946            .await
947    }
948
949    /// Only handle aggregations received as [`VectorDiff`]s.
950    pub(super) async fn handle_remote_aggregations(
951        &self,
952        diffs: Vec<VectorDiff<TimelineEvent>>,
953        origin: RemoteEventOrigin,
954    ) {
955        if diffs.is_empty() {
956            return;
957        }
958
959        let mut state = self.state.write().await;
960        state
961            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
962            .await
963    }
964
965    pub(super) async fn clear(&self) {
966        self.state.write().await.clear();
967    }
968
969    /// Replaces the content of the current timeline with initial events.
970    ///
971    /// Also sets up read receipts and the read marker for a live timeline of a
972    /// room.
973    ///
974    /// This is all done with a single lock guard, since we don't want the state
975    /// to be modified between the clear and re-insertion of new events.
976    pub(super) async fn replace_with_initial_remote_events<Events>(
977        &self,
978        events: Events,
979        origin: RemoteEventOrigin,
980    ) where
981        Events: IntoIterator,
982        <Events as IntoIterator>::Item: Into<TimelineEvent>,
983    {
984        let mut state = self.state.write().await;
985
986        let track_read_markers = &self.settings.track_read_receipts;
987        if track_read_markers.is_enabled() {
988            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
989            state
990                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
991                .await;
992        }
993
994        // Replace the events if either the current event list or the new one aren't
995        // empty.
996        // Previously we just had to check the new one wasn't empty because
997        // we did a clear operation before so the current one would always be empty, but
998        // now we may want to replace a populated timeline with an empty one.
999        let mut events = events.into_iter().peekable();
1000        if !state.items.is_empty() || events.peek().is_some() {
1001            state
1002                .replace_with_remote_events(
1003                    events,
1004                    origin,
1005                    &self.room_data_provider,
1006                    &self.settings,
1007                )
1008                .await;
1009        }
1010
1011        if track_read_markers.is_enabled() {
1012            if let Some(fully_read_event_id) =
1013                self.room_data_provider.load_fully_read_marker().await
1014            {
1015                state.handle_fully_read_marker(fully_read_event_id);
1016            } else if let Some(latest_receipt_event_id) = state
1017                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1018            {
1019                // Fall back to read receipt if no fully read marker exists.
1020                debug!("no `m.fully_read` marker found, falling back to read receipt");
1021                state.handle_fully_read_marker(latest_receipt_event_id);
1022            }
1023        }
1024    }
1025
1026    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1027        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1028    }
1029
1030    pub(super) async fn handle_ephemeral_events(
1031        &self,
1032        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1033    ) {
1034        let mut state = self.state.write().await;
1035        state.handle_ephemeral_events(events, &self.room_data_provider).await;
1036    }
1037
1038    /// Creates the local echo for an event we're sending.
1039    #[instrument(skip_all)]
1040    pub(super) async fn handle_local_event(
1041        &self,
1042        txn_id: OwnedTransactionId,
1043        content: AnyMessageLikeEventContent,
1044        send_handle: Option<SendHandle>,
1045    ) {
1046        let sender = self.room_data_provider.own_user_id().to_owned();
1047        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1048
1049        let date_divider_mode = self.settings.date_divider_mode.clone();
1050
1051        let mut state = self.state.write().await;
1052        state
1053            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1054            .await;
1055    }
1056
1057    /// Update the send state of a local event represented by a transaction ID.
1058    ///
1059    /// If the corresponding local timeline item is missing, a warning is
1060    /// raised.
1061    #[instrument(skip(self))]
1062    pub(super) async fn update_event_send_state(
1063        &self,
1064        txn_id: &TransactionId,
1065        send_state: EventSendState,
1066    ) {
1067        let mut state = self.state.write().await;
1068        let mut txn = state.transaction();
1069
1070        let new_event_id: Option<&EventId> =
1071            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1072
1073        // The local echoes are always at the end of the timeline, we must first make
1074        // sure the remote echo hasn't showed up yet.
1075        if rfind_event_item(&txn.items, |it| {
1076            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1077        })
1078        .is_some()
1079        {
1080            // Remote echo already received. This is very unlikely.
1081            trace!("Remote echo received before send-event response");
1082
1083            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1084
1085            // If there's both the remote echo and a local echo, that means the
1086            // remote echo was received before the response *and* contained no
1087            // transaction ID (and thus duplicated the local echo).
1088            if let Some((idx, _)) = local_echo {
1089                warn!("Message echo got duplicated, removing the local one");
1090                txn.items.remove(idx);
1091
1092                // Adjust the date dividers, if needs be.
1093                let mut adjuster =
1094                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1095                adjuster.run(&mut txn.items, &mut txn.meta);
1096            }
1097
1098            txn.commit();
1099            return;
1100        }
1101
1102        // Look for the local event by the transaction ID or event ID.
1103        let result = rfind_event_item(&txn.items, |it| {
1104            it.transaction_id() == Some(txn_id)
1105                || new_event_id.is_some()
1106                    && it.event_id() == new_event_id
1107                    && it.as_local().is_some()
1108        });
1109
1110        let Some((idx, item)) = result else {
1111            // Event wasn't found as a standalone item.
1112            //
1113            // If it was just sent, try to find if it matches a corresponding aggregation,
1114            // and mark it as sent in that case.
1115            if let Some(new_event_id) = new_event_id {
1116                if txn.meta.aggregations.mark_aggregation_as_sent(
1117                    txn_id.to_owned(),
1118                    new_event_id.to_owned(),
1119                    &mut txn.items,
1120                    &txn.meta.room_version_rules,
1121                ) {
1122                    trace!("Aggregation marked as sent");
1123                    txn.commit();
1124                    return;
1125                }
1126
1127                trace!("Sent aggregation was not found");
1128            }
1129
1130            warn!("Timeline item not found, can't update send state");
1131            return;
1132        };
1133
1134        let Some(local_item) = item.as_local() else {
1135            warn!("We looked for a local item, but it transitioned to remote.");
1136            return;
1137        };
1138
1139        // The event was already marked as sent, that's a broken state, let's
1140        // emit an error but also override to the given sent state.
1141        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1142            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1143        }
1144
1145        // If the event has just been marked as sent, update the aggregations mapping to
1146        // take that into account.
1147        if let Some(new_event_id) = new_event_id {
1148            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1149        }
1150
1151        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1152        txn.items.replace(idx, new_item);
1153
1154        txn.commit();
1155    }
1156
1157    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1158        let mut state = self.state.write().await;
1159
1160        if let Some((idx, _)) =
1161            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1162        {
1163            let mut txn = state.transaction();
1164
1165            txn.items.remove(idx);
1166
1167            // A read marker or a date divider may have been inserted before the local echo.
1168            // Ensure both are up to date.
1169            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1170            adjuster.run(&mut txn.items, &mut txn.meta);
1171
1172            txn.meta.update_read_marker(&mut txn.items);
1173
1174            txn.commit();
1175
1176            debug!("discarded local echo");
1177            return true;
1178        }
1179
1180        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1181        // dereferencing it once.
1182        let mut txn = state.transaction();
1183
1184        // Look if this was a local aggregation.
1185        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1186            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1187            &mut txn.items,
1188        ) {
1189            Ok(val) => val,
1190            Err(err) => {
1191                warn!("error when discarding local echo for an aggregation: {err}");
1192                // The aggregation has been found, it's just that we couldn't discard it.
1193                true
1194            }
1195        };
1196
1197        if found_aggregation {
1198            txn.commit();
1199        }
1200
1201        found_aggregation
1202    }
1203
1204    pub(super) async fn replace_local_echo(
1205        &self,
1206        txn_id: &TransactionId,
1207        content: AnyMessageLikeEventContent,
1208    ) -> bool {
1209        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1210            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1211            // handling RoomMessage should be sufficient in most cases. Worst
1212            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1213            // editing the event then.
1214            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1215            return false;
1216        };
1217
1218        let mut state = self.state.write().await;
1219        let mut txn = state.transaction();
1220
1221        let Some((idx, prev_item)) =
1222            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1223        else {
1224            debug!("Can't find local echo to replace");
1225            return false;
1226        };
1227
1228        // Reuse the previous local echo's state, but reset the send state to not sent
1229        // (per API contract).
1230        let ti_kind = {
1231            let Some(prev_local_item) = prev_item.as_local() else {
1232                warn!("We looked for a local item, but it transitioned as remote??");
1233                return false;
1234            };
1235            // If the local echo had an upload progress, retain it.
1236            let progress = as_variant!(&prev_local_item.send_state,
1237                EventSendState::NotSentYet { progress } => progress.clone())
1238            .flatten();
1239            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1240        };
1241
1242        // Replace the local-related state (kind) and the content state.
1243        let new_item = TimelineItem::new(
1244            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1245                content.msgtype,
1246                content.mentions,
1247                prev_item.content().reactions().cloned().unwrap_or_default(),
1248                prev_item.content().thread_root(),
1249                prev_item.content().in_reply_to(),
1250                prev_item.content().thread_summary(),
1251            )),
1252            prev_item.internal_id.to_owned(),
1253        );
1254
1255        txn.items.replace(idx, new_item);
1256
1257        // This doesn't change the original sending time, so there's no need to adjust
1258        // date dividers.
1259
1260        txn.commit();
1261
1262        debug!("Replaced local echo");
1263        true
1264    }
1265
1266    pub(super) async fn compute_redecryption_candidates(
1267        &self,
1268    ) -> (BTreeSet<String>, BTreeSet<String>) {
1269        let state = self.state.read().await;
1270        compute_redecryption_candidates(&state.items)
1271    }
1272
1273    pub(super) async fn set_sender_profiles_pending(&self) {
1274        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1275    }
1276
1277    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1278        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1279    }
1280
1281    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1282        self.state.write().await.items.for_each(|mut entry| {
1283            let Some(event_item) = entry.as_event() else { return };
1284            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1285                let new_item = entry.with_kind(TimelineItemKind::Event(
1286                    event_item.with_sender_profile(profile_state.clone()),
1287                ));
1288                ObservableItemsEntry::replace(&mut entry, new_item);
1289            }
1290        });
1291    }
1292
1293    pub(super) async fn update_missing_sender_profiles(&self) {
1294        trace!("Updating missing sender profiles");
1295
1296        let mut state = self.state.write().await;
1297        let mut entries = state.items.entries();
1298        while let Some(mut entry) = entries.next() {
1299            let Some(event_item) = entry.as_event() else { continue };
1300            let event_id = event_item.event_id().map(debug);
1301            let transaction_id = event_item.transaction_id().map(debug);
1302
1303            if event_item.sender_profile().is_ready() {
1304                trace!(event_id, transaction_id, "Profile already set");
1305                continue;
1306            }
1307
1308            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1309                Some(profile) => {
1310                    trace!(event_id, transaction_id, "Adding profile");
1311                    let updated_item =
1312                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1313                    let new_item = entry.with_kind(updated_item);
1314                    ObservableItemsEntry::replace(&mut entry, new_item);
1315                }
1316                None => {
1317                    if !event_item.sender_profile().is_unavailable() {
1318                        trace!(event_id, transaction_id, "Marking profile unavailable");
1319                        let updated_item =
1320                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1321                        let new_item = entry.with_kind(updated_item);
1322                        ObservableItemsEntry::replace(&mut entry, new_item);
1323                    } else {
1324                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1325                    }
1326                }
1327            }
1328        }
1329
1330        trace!("Done updating missing sender profiles");
1331    }
1332
1333    /// Update the profiles of the given senders, even if they are ready.
1334    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1335        trace!("Forcing update of sender profiles: {sender_ids:?}");
1336
1337        let mut state = self.state.write().await;
1338        let mut entries = state.items.entries();
1339        while let Some(mut entry) = entries.next() {
1340            let Some(event_item) = entry.as_event() else { continue };
1341            if !sender_ids.contains(event_item.sender()) {
1342                continue;
1343            }
1344
1345            let event_id = event_item.event_id().map(debug);
1346            let transaction_id = event_item.transaction_id().map(debug);
1347
1348            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1349                Some(profile) => {
1350                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1351                    {
1352                        debug!(event_id, transaction_id, "Profile already up-to-date");
1353                    } else {
1354                        trace!(event_id, transaction_id, "Updating profile");
1355                        let updated_item =
1356                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1357                        let new_item = entry.with_kind(updated_item);
1358                        ObservableItemsEntry::replace(&mut entry, new_item);
1359                    }
1360                }
1361                None => {
1362                    if !event_item.sender_profile().is_unavailable() {
1363                        trace!(event_id, transaction_id, "Marking profile unavailable");
1364                        let updated_item =
1365                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1366                        let new_item = entry.with_kind(updated_item);
1367                        ObservableItemsEntry::replace(&mut entry, new_item);
1368                    } else {
1369                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1370                    }
1371                }
1372            }
1373        }
1374
1375        trace!("Done forcing update of sender profiles");
1376    }
1377
1378    #[cfg(test)]
1379    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1380        let own_user_id = self.room_data_provider.own_user_id();
1381        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1382    }
1383
1384    /// Get the latest read receipt for the given user.
1385    ///
1386    /// Useful to get the latest read receipt, whether it's private or public.
1387    pub(super) async fn latest_user_read_receipt(
1388        &self,
1389        user_id: &UserId,
1390    ) -> Option<(OwnedEventId, Receipt)> {
1391        let receipt_thread = self.focus.receipt_thread();
1392
1393        self.state
1394            .read()
1395            .await
1396            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1397            .await
1398    }
1399
1400    /// Get the ID of the timeline event with the latest read receipt for the
1401    /// given user.
1402    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1403        &self,
1404        user_id: &UserId,
1405    ) -> Option<OwnedEventId> {
1406        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1407    }
1408
1409    /// Subscribe to changes in the read receipts of our own user.
1410    pub async fn subscribe_own_user_read_receipts_changed(
1411        &self,
1412    ) -> impl Stream<Item = ()> + use<P> {
1413        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1414    }
1415
1416    /// Handle a room send update that's a new local echo.
1417    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1418        match echo.content {
1419            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1420                let content = match serialized_event.deserialize() {
1421                    Ok(d) => d,
1422                    Err(err) => {
1423                        warn!("error deserializing local echo: {err}");
1424                        return;
1425                    }
1426                };
1427
1428                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1429                    .await;
1430
1431                if let Some(send_error) = send_error {
1432                    self.update_event_send_state(
1433                        &echo.transaction_id,
1434                        EventSendState::SendingFailed {
1435                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1436                                send_error,
1437                            ))),
1438                            is_recoverable: false,
1439                        },
1440                    )
1441                    .await;
1442                }
1443            }
1444
1445            LocalEchoContent::React { key, send_handle, applies_to } => {
1446                self.handle_local_reaction(key, send_handle, applies_to).await;
1447            }
1448        }
1449    }
1450
1451    /// Adds a reaction (local echo) to a local echo.
1452    #[instrument(skip(self, send_handle))]
1453    async fn handle_local_reaction(
1454        &self,
1455        reaction_key: String,
1456        send_handle: SendReactionHandle,
1457        applies_to: OwnedTransactionId,
1458    ) {
1459        let mut state = self.state.write().await;
1460        let mut tr = state.transaction();
1461
1462        let target = TimelineEventItemId::TransactionId(applies_to);
1463
1464        let reaction_txn_id = send_handle.transaction_id().to_owned();
1465        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1466        let aggregation = Aggregation::new(
1467            TimelineEventItemId::TransactionId(reaction_txn_id),
1468            AggregationKind::Reaction {
1469                key: reaction_key.clone(),
1470                sender: self.room_data_provider.own_user_id().to_owned(),
1471                timestamp: MilliSecondsSinceUnixEpoch::now(),
1472                reaction_status,
1473            },
1474        );
1475
1476        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1477        find_item_and_apply_aggregation(
1478            &tr.meta.aggregations,
1479            &mut tr.items,
1480            &target,
1481            aggregation,
1482            &tr.meta.room_version_rules,
1483        );
1484
1485        tr.commit();
1486    }
1487
1488    /// Handle a single room send queue update.
1489    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1490        match update {
1491            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1492                self.handle_local_echo(echo).await;
1493            }
1494
1495            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1496                if !self.discard_local_echo(&transaction_id).await {
1497                    warn!("couldn't find the local echo to discard");
1498                }
1499            }
1500
1501            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1502                let content = match new_content.deserialize() {
1503                    Ok(d) => d,
1504                    Err(err) => {
1505                        warn!("error deserializing local echo (upon edit): {err}");
1506                        return;
1507                    }
1508                };
1509
1510                if !self.replace_local_echo(&transaction_id, content).await {
1511                    warn!("couldn't find the local echo to replace");
1512                }
1513            }
1514
1515            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1516                self.update_event_send_state(
1517                    &transaction_id,
1518                    EventSendState::SendingFailed { error, is_recoverable },
1519                )
1520                .await;
1521            }
1522
1523            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1524                self.update_event_send_state(
1525                    &transaction_id,
1526                    EventSendState::NotSentYet { progress: None },
1527                )
1528                .await;
1529            }
1530
1531            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1532                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1533                    .await;
1534            }
1535
1536            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1537                self.update_event_send_state(
1538                    &related_to,
1539                    EventSendState::NotSentYet {
1540                        progress: Some(MediaUploadProgress { index, progress }),
1541                    },
1542                )
1543                .await;
1544            }
1545        }
1546    }
1547
1548    /// Insert a timeline start item at the beginning of the room, if it's
1549    /// missing.
1550    pub async fn insert_timeline_start_if_missing(&self) {
1551        let mut state = self.state.write().await;
1552        let mut txn = state.transaction();
1553        txn.items.push_timeline_start_if_missing(
1554            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1555        );
1556        txn.commit();
1557    }
1558
1559    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1560    /// timeline or not.
1561    ///
1562    /// Can be `None` if the event cannot be represented as a standalone item,
1563    /// because it's an aggregation.
1564    pub(super) async fn make_replied_to(
1565        &self,
1566        event: TimelineEvent,
1567    ) -> Result<Option<EmbeddedEvent>, Error> {
1568        let state = self.state.read().await;
1569        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1570    }
1571}
1572
1573impl TimelineController {
1574    pub(super) fn room(&self) -> &Room {
1575        &self.room_data_provider
1576    }
1577
1578    /// Given an event identifier, will fetch the details for the event it's
1579    /// replying to, if applicable.
1580    #[instrument(skip(self))]
1581    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1582        let state_guard = self.state.write().await;
1583        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1584            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1585        let remote_item = item
1586            .as_remote()
1587            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1588            .clone();
1589
1590        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1591            debug!("Event is not a message");
1592            return Ok(());
1593        };
1594        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1595            debug!("Event is not a reply");
1596            return Ok(());
1597        };
1598        if let TimelineDetails::Pending = &in_reply_to.event {
1599            debug!("Replied-to event is already being fetched");
1600            return Ok(());
1601        }
1602        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1603            debug!("Replied-to event has already been fetched");
1604            return Ok(());
1605        }
1606
1607        let internal_id = item.internal_id.to_owned();
1608        let item = item.clone();
1609        let event = fetch_replied_to_event(
1610            state_guard,
1611            &self.state,
1612            index,
1613            &item,
1614            internal_id,
1615            &msglike,
1616            &in_reply_to.event_id,
1617            self.room(),
1618        )
1619        .await?;
1620
1621        // We need to be sure to have the latest position of the event as it might have
1622        // changed while waiting for the request.
1623        let mut state = self.state.write().await;
1624        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1625            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1626
1627        // Check the state of the event again, it might have been redacted while
1628        // the request was in-flight.
1629        let TimelineItemContent::MsgLike(MsgLikeContent {
1630            kind: MsgLikeKind::Message(message),
1631            reactions,
1632            thread_root,
1633            in_reply_to,
1634            thread_summary,
1635        }) = item.content().clone()
1636        else {
1637            info!("Event is no longer a message (redacted?)");
1638            return Ok(());
1639        };
1640        let Some(in_reply_to) = in_reply_to else {
1641            warn!("Event no longer has a reply (bug?)");
1642            return Ok(());
1643        };
1644
1645        // Now that we've received the content of the replied-to event, replace the
1646        // replied-to content in the item with it.
1647        trace!("Updating in-reply-to details");
1648        let internal_id = item.internal_id.to_owned();
1649        let mut item = item.clone();
1650        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1651            kind: MsgLikeKind::Message(message),
1652            reactions,
1653            thread_root,
1654            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1655            thread_summary,
1656        }));
1657        state.items.replace(index, TimelineItem::new(item, internal_id));
1658
1659        Ok(())
1660    }
1661
1662    /// Returns the thread that should be used for a read receipt based on the
1663    /// current focus of the timeline and the receipt type.
1664    ///
1665    /// A `SendReceiptType::FullyRead` will always use
1666    /// `ReceiptThread::Unthreaded`
1667    pub(super) fn infer_thread_for_read_receipt(
1668        &self,
1669        receipt_type: &SendReceiptType,
1670    ) -> ReceiptThread {
1671        if matches!(receipt_type, SendReceiptType::FullyRead) {
1672            ReceiptThread::Unthreaded
1673        } else {
1674            self.focus.receipt_thread()
1675        }
1676    }
1677
1678    /// Check whether the given receipt should be sent.
1679    ///
1680    /// Returns `false` if the given receipt is older than the current one.
1681    pub(super) async fn should_send_receipt(
1682        &self,
1683        receipt_type: &SendReceiptType,
1684        receipt_thread: &ReceiptThread,
1685        event_id: &EventId,
1686    ) -> bool {
1687        let own_user_id = self.room().own_user_id();
1688        let state = self.state.read().await;
1689        let room = self.room();
1690
1691        match receipt_type {
1692            SendReceiptType::Read => {
1693                if let Some((old_pub_read, _)) = state
1694                    .meta
1695                    .user_receipt(
1696                        own_user_id,
1697                        ReceiptType::Read,
1698                        receipt_thread.clone(),
1699                        room,
1700                        state.items.all_remote_events(),
1701                    )
1702                    .await
1703                {
1704                    trace!(%old_pub_read, "found a previous public receipt");
1705                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1706                        &old_pub_read,
1707                        event_id,
1708                        state.items.all_remote_events(),
1709                    ) {
1710                        trace!(
1711                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1712                        );
1713                        return relative_pos == RelativePosition::After;
1714                    }
1715                }
1716            }
1717
1718            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1719            // doesn't make sense to have a private read receipt behind a public one.
1720            SendReceiptType::ReadPrivate => {
1721                if let Some((old_priv_read, _)) =
1722                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1723                {
1724                    trace!(%old_priv_read, "found a previous private receipt");
1725                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1726                        &old_priv_read,
1727                        event_id,
1728                        state.items.all_remote_events(),
1729                    ) {
1730                        trace!(
1731                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1732                        );
1733                        return relative_pos == RelativePosition::After;
1734                    }
1735                }
1736            }
1737
1738            SendReceiptType::FullyRead => {
1739                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1740                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1741                        &prev_event_id,
1742                        event_id,
1743                        state.items.all_remote_events(),
1744                    )
1745                {
1746                    return relative_pos == RelativePosition::After;
1747                }
1748            }
1749
1750            _ => {}
1751        }
1752
1753        // Let the server handle unknown receipts.
1754        true
1755    }
1756
1757    /// Returns the latest event identifier, even if it's not visible, or if
1758    /// it's folded into another timeline item.
1759    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1760        let state = self.state.read().await;
1761        let filter_out_thread_events = match self.focus() {
1762            TimelineFocusKind::Thread { .. } => false,
1763            TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1764            TimelineFocusKind::Event { paginator } => {
1765                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1766            }
1767            _ => true,
1768        };
1769
1770        // In some timelines, threaded events are added to the `AllRemoteEvents`
1771        // collection since they need to be taken into account to calculate read
1772        // receipts, but we don't want to actually take them into account for returning
1773        // the latest event id since they're not visibly in the timeline
1774        state
1775            .items
1776            .all_remote_events()
1777            .iter()
1778            .rev()
1779            .filter_map(|item| {
1780                if !filter_out_thread_events || item.thread_root_id.is_none() {
1781                    Some(item.event_id.clone())
1782                } else {
1783                    None
1784                }
1785            })
1786            .next()
1787    }
1788
1789    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1790    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1791        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1792
1793        let request = DecryptionRetryRequest {
1794            room_id: self.room().room_id().to_owned(),
1795            utd_session_ids: utds,
1796            refresh_info_session_ids: decrypted,
1797        };
1798
1799        self.room().client().event_cache().request_decryption(request);
1800    }
1801
1802    /// Combine the global (event cache) pagination status with the local state
1803    /// of the timeline.
1804    ///
1805    /// This only changes the global pagination status of this room, in one
1806    /// case: if the timeline has a skip count greater than 0, it will
1807    /// ensure that the pagination status says that we haven't reached the
1808    /// timeline start yet.
1809    pub(super) async fn map_pagination_status(
1810        &self,
1811        status: RoomPaginationStatus,
1812    ) -> RoomPaginationStatus {
1813        match status {
1814            RoomPaginationStatus::Idle { hit_timeline_start } => {
1815                if hit_timeline_start {
1816                    let state = self.state.read().await;
1817                    // If the skip count is greater than 0, it means that a subsequent pagination
1818                    // could return more items, so pretend we didn't get the information that the
1819                    // timeline start was hit.
1820                    if state.meta.subscriber_skip_count.get() > 0 {
1821                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1822                    }
1823                }
1824            }
1825            RoomPaginationStatus::Paginating => {}
1826        }
1827
1828        // You're perfect, just the way you are.
1829        status
1830    }
1831}
1832
1833impl<P: RoomDataProvider> TimelineController<P> {
1834    /// Returns the timeline focus of the [`TimelineController`].
1835    pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1836        &self.focus
1837    }
1838}
1839
1840#[allow(clippy::too_many_arguments)]
1841async fn fetch_replied_to_event<P: RoomDataProvider>(
1842    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1843    state_lock: &RwLock<TimelineState<P>>,
1844    index: usize,
1845    item: &EventTimelineItem,
1846    internal_id: TimelineUniqueId,
1847    msglike: &MsgLikeContent,
1848    in_reply_to: &EventId,
1849    room: &Room,
1850) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1851    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1852        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1853        trace!("Found replied-to event locally");
1854        return Ok(details);
1855    }
1856
1857    // Replace the item with a new timeline item that has the fetching status of the
1858    // replied-to event to pending.
1859    trace!("Setting in-reply-to details to pending");
1860    let in_reply_to_details =
1861        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1862
1863    let event_item = item
1864        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1865
1866    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1867    state_guard.items.replace(index, new_timeline_item);
1868
1869    // Don't hold the state lock while the network request is made.
1870    drop(state_guard);
1871
1872    trace!("Fetching replied-to event");
1873    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1874        Ok(timeline_event) => {
1875            let state = state_lock.read().await;
1876
1877            let replied_to_item =
1878                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1879
1880            if let Some(item) = replied_to_item {
1881                TimelineDetails::Ready(Box::new(item))
1882            } else {
1883                // The replied-to item is an aggregation, not a standalone item.
1884                return Err(Error::UnsupportedEvent);
1885            }
1886        }
1887
1888        Err(e) => TimelineDetails::Error(Arc::new(e)),
1889    };
1890
1891    Ok(res)
1892}