matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::VectorDiff;
20use eyeball_im_util::vector::VectorObserverExt;
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
25use matrix_sdk::{
26    deserialized_responses::TimelineEvent,
27    event_cache::{
28        paginator::{PaginationResult, Paginator},
29        RoomEventCache, RoomPaginationStatus,
30    },
31    send_queue::{
32        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33    },
34    Result, Room,
35};
36use ruma::{
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        poll::unstable_start::UnstablePollStartEventContent,
40        reaction::ReactionEventContent,
41        receipt::{Receipt, ReceiptThread, ReceiptType},
42        relation::Annotation,
43        room::message::{MessageType, Relation},
44        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45        AnySyncTimelineEvent, MessageLikeEventType,
46    },
47    serde::Raw,
48    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
49    TransactionId, UserId,
50};
51#[cfg(test)]
52use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
53use tokio::sync::{RwLock, RwLockWriteGuard};
54use tracing::{debug, error, field::debug, info, instrument, trace, warn};
55
56pub(super) use self::{
57    metadata::{RelativePosition, TimelineMetadata},
58    observable_items::{
59        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
60        ObservableItemsTransactionEntry,
61    },
62    state::TimelineState,
63    state_transaction::TimelineStateTransaction,
64};
65use super::{
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_item::{ReactionStatus, RemoteEventOrigin},
68    item::TimelineUniqueId,
69    subscriber::TimelineSubscriber,
70    threaded_events_loader::ThreadedEventsLoader,
71    traits::{Decryptor, RoomDataProvider},
72    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
73    PaginationError, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
74    TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
75};
76use crate::{
77    timeline::{
78        algorithms::rfind_event_by_item_id,
79        date_dividers::DateDividerAdjuster,
80        event_item::EventTimelineItemKind,
81        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
82        MsgLikeContent, MsgLikeKind, TimelineEventFilterFn,
83    },
84    unable_to_decrypt_hook::UtdHookManager,
85};
86
87pub(in crate::timeline) mod aggregations;
88mod decryption_retry_task;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97/// Data associated to the current timeline focus.
98#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100    /// The timeline receives live events from the sync.
101    Live,
102
103    /// The timeline is focused on a single event, and it can expand in one
104    /// direction or another.
105    Event {
106        /// The event id we've started to focus on.
107        event_id: OwnedEventId,
108        /// The paginator instance.
109        paginator: Paginator<P>,
110        /// Number of context events to request for the first request.
111        num_context_events: u16,
112    },
113
114    Thread {
115        loader: ThreadedEventsLoader<P>,
116
117        /// Number of relations events to requests for the first request
118        num_events: u16,
119    },
120
121    PinnedEvents {
122        loader: PinnedEventsLoader,
123    },
124}
125
126#[derive(Clone, Debug)]
127pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
128    /// Inner mutable state.
129    state: Arc<RwLock<TimelineState>>,
130
131    /// Inner mutable focus state.
132    focus: Arc<RwLock<TimelineFocusData<P>>>,
133
134    /// A [`RoomDataProvider`] implementation, providing data.
135    ///
136    /// Useful for testing only; in the real world, it's just a [`Room`].
137    pub(crate) room_data_provider: P,
138
139    /// Settings applied to this timeline.
140    pub(super) settings: TimelineSettings,
141
142    /// Long-running task used to retry decryption of timeline items without
143    /// blocking main processing.
144    decryption_retry_task: DecryptionRetryTask<D>,
145}
146
147#[derive(Clone)]
148pub(super) struct TimelineSettings {
149    /// Should the read receipts and read markers be handled?
150    pub(super) track_read_receipts: bool,
151
152    /// Event filter that controls what's rendered as a timeline item (and thus
153    /// what can carry read receipts).
154    pub(super) event_filter: Arc<TimelineEventFilterFn>,
155
156    /// Are unparsable events added as timeline items of their own kind?
157    pub(super) add_failed_to_parse: bool,
158
159    /// Should the timeline items be grouped by day or month?
160    pub(super) date_divider_mode: DateDividerMode,
161}
162
163#[cfg(not(tarpaulin_include))]
164impl fmt::Debug for TimelineSettings {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        f.debug_struct("TimelineSettings")
167            .field("track_read_receipts", &self.track_read_receipts)
168            .field("add_failed_to_parse", &self.add_failed_to_parse)
169            .finish_non_exhaustive()
170    }
171}
172
173impl Default for TimelineSettings {
174    fn default() -> Self {
175        Self {
176            track_read_receipts: false,
177            event_filter: Arc::new(default_event_filter),
178            add_failed_to_parse: true,
179            date_divider_mode: DateDividerMode::Daily,
180        }
181    }
182}
183
184#[derive(Debug, Clone)]
185pub(super) enum TimelineFocusKind {
186    Live { hide_threaded_events: bool },
187    Event { hide_threaded_events: bool },
188    Thread { root_event_id: OwnedEventId },
189    PinnedEvents,
190}
191
192/// The default event filter for
193/// [`crate::timeline::TimelineBuilder::event_filter`].
194///
195/// It filters out events that are not rendered by the timeline, including but
196/// not limited to: reactions, edits, redactions on existing messages.
197///
198/// If you have a custom filter, it may be best to chain yours with this one if
199/// you do not want to run into situations where a read receipt is not visible
200/// because it's living on an event that doesn't have a matching timeline item.
201pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
202    match event {
203        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
204            if ev.redacts(room_version).is_some() {
205                // This is a redaction of an existing message, we'll only update the previous
206                // message and not render a new entry.
207                false
208            } else {
209                // This is a redacted entry, that we'll show only if the redacted entity wasn't
210                // a reaction.
211                ev.event_type() != MessageLikeEventType::Reaction
212            }
213        }
214
215        AnySyncTimelineEvent::MessageLike(msg) => {
216            match msg.original_content() {
217                None => {
218                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
219                    // a reaction.
220                    msg.event_type() != MessageLikeEventType::Reaction
221                }
222
223                Some(original_content) => {
224                    match original_content {
225                        AnyMessageLikeEventContent::RoomMessage(content) => {
226                            if content
227                                .relates_to
228                                .as_ref()
229                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
230                            {
231                                // Edits aren't visible by default.
232                                return false;
233                            }
234
235                            match content.msgtype {
236                                MessageType::Audio(_)
237                                | MessageType::Emote(_)
238                                | MessageType::File(_)
239                                | MessageType::Image(_)
240                                | MessageType::Location(_)
241                                | MessageType::Notice(_)
242                                | MessageType::ServerNotice(_)
243                                | MessageType::Text(_)
244                                | MessageType::Video(_)
245                                | MessageType::VerificationRequest(_) => true,
246                                #[cfg(feature = "unstable-msc4274")]
247                                MessageType::Gallery(_) => true,
248                                _ => false,
249                            }
250                        }
251
252                        AnyMessageLikeEventContent::Sticker(_)
253                        | AnyMessageLikeEventContent::UnstablePollStart(
254                            UnstablePollStartEventContent::New(_),
255                        )
256                        | AnyMessageLikeEventContent::CallInvite(_)
257                        | AnyMessageLikeEventContent::CallNotify(_)
258                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
259
260                        _ => false,
261                    }
262                }
263            }
264        }
265
266        AnySyncTimelineEvent::State(_) => {
267            // All the state events may get displayed by default.
268            true
269        }
270    }
271}
272
273impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
274    pub(super) fn new(
275        room_data_provider: P,
276        focus: TimelineFocus,
277        internal_id_prefix: Option<String>,
278        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
279        is_room_encrypted: bool,
280    ) -> Self {
281        let (focus_data, focus_kind) = match focus {
282            TimelineFocus::Live { hide_threaded_events } => {
283                (TimelineFocusData::Live, TimelineFocusKind::Live { hide_threaded_events })
284            }
285
286            TimelineFocus::Event { target, num_context_events, hide_threaded_events } => {
287                let paginator = Paginator::new(room_data_provider.clone());
288                (
289                    TimelineFocusData::Event { paginator, event_id: target, num_context_events },
290                    TimelineFocusKind::Event { hide_threaded_events },
291                )
292            }
293
294            TimelineFocus::Thread { root_event_id, num_events } => (
295                TimelineFocusData::Thread {
296                    loader: ThreadedEventsLoader::new(
297                        room_data_provider.clone(),
298                        root_event_id.clone(),
299                    ),
300                    num_events,
301                },
302                TimelineFocusKind::Thread { root_event_id },
303            ),
304
305            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
306                TimelineFocusData::PinnedEvents {
307                    loader: PinnedEventsLoader::new(
308                        Arc::new(room_data_provider.clone()),
309                        max_events_to_load as usize,
310                        max_concurrent_requests as usize,
311                    ),
312                },
313                TimelineFocusKind::PinnedEvents,
314            ),
315        };
316
317        let state = Arc::new(RwLock::new(TimelineState::new(
318            focus_kind,
319            room_data_provider.own_user_id().to_owned(),
320            room_data_provider.room_version(),
321            internal_id_prefix,
322            unable_to_decrypt_hook,
323            is_room_encrypted,
324        )));
325
326        let settings = TimelineSettings::default();
327
328        let decryption_retry_task =
329            DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
330
331        Self {
332            state,
333            focus: Arc::new(RwLock::new(focus_data)),
334            room_data_provider,
335            settings,
336            decryption_retry_task,
337        }
338    }
339
340    /// Initializes the configured focus with appropriate data.
341    ///
342    /// Should be called only once after creation of the [`TimelineInner`], with
343    /// all its fields set.
344    ///
345    /// Returns whether there were any events added to the timeline.
346    pub(super) async fn init_focus(
347        &self,
348        room_event_cache: &RoomEventCache,
349    ) -> Result<bool, Error> {
350        let focus_guard = self.focus.read().await;
351
352        match &*focus_guard {
353            TimelineFocusData::Live => {
354                // Retrieve the cached events, and add them to the timeline.
355                let events = room_event_cache.events().await;
356
357                let has_events = !events.is_empty();
358
359                self.replace_with_initial_remote_events(
360                    events.into_iter(),
361                    RemoteEventOrigin::Cache,
362                )
363                .await;
364
365                match room_event_cache.pagination().status().get() {
366                    RoomPaginationStatus::Idle { hit_timeline_start } => {
367                        if hit_timeline_start {
368                            // Eagerly insert the timeline start item, since pagination claims
369                            // we've already hit the timeline start.
370                            self.insert_timeline_start_if_missing().await;
371                        }
372                    }
373                    RoomPaginationStatus::Paginating => {}
374                }
375
376                Ok(has_events)
377            }
378
379            TimelineFocusData::Event { event_id, paginator, num_context_events } => {
380                // Start a /context request, and append the results (in order) to the timeline.
381                let start_from_result = paginator
382                    .start_from(event_id, (*num_context_events).into())
383                    .await
384                    .map_err(PaginationError::Paginator)?;
385
386                drop(focus_guard);
387
388                let has_events = !start_from_result.events.is_empty();
389
390                self.replace_with_initial_remote_events(
391                    start_from_result.events.into_iter(),
392                    RemoteEventOrigin::Pagination,
393                )
394                .await;
395
396                Ok(has_events)
397            }
398
399            TimelineFocusData::Thread { loader, num_events } => {
400                let result = loader
401                    .paginate_backwards((*num_events).into())
402                    .await
403                    .map_err(PaginationError::Paginator)?;
404
405                drop(focus_guard);
406
407                // Events are in reverse topological order.
408                self.replace_with_initial_remote_events(
409                    result.events.into_iter().rev(),
410                    RemoteEventOrigin::Pagination,
411                )
412                .await;
413
414                Ok(true)
415            }
416
417            TimelineFocusData::PinnedEvents { loader } => {
418                let Some(loaded_events) =
419                    loader.load_events().await.map_err(Error::PinnedEventsError)?
420                else {
421                    // There wasn't any events.
422                    return Ok(false);
423                };
424
425                drop(focus_guard);
426
427                let has_events = !loaded_events.is_empty();
428
429                self.replace_with_initial_remote_events(
430                    loaded_events.into_iter(),
431                    RemoteEventOrigin::Pagination,
432                )
433                .await;
434
435                Ok(has_events)
436            }
437        }
438    }
439
440    /// Listens to encryption state changes for the room in
441    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
442    /// existing timeline items. This will then cause a refresh of those
443    /// timeline items.
444    pub async fn handle_encryption_state_changes(&self) {
445        let mut room_info = self.room_data_provider.room_info();
446
447        // Small function helper to help mark as encrypted.
448        let mark_encrypted = || async {
449            let mut state = self.state.write().await;
450            state.meta.is_room_encrypted = true;
451            state.mark_all_events_as_encrypted();
452        };
453
454        if room_info.get().encryption_state().is_encrypted() {
455            // If the room was already encrypted, it won't toggle to unencrypted, so we can
456            // shut down this task early.
457            mark_encrypted().await;
458            return;
459        }
460
461        while let Some(info) = room_info.next().await {
462            if info.encryption_state().is_encrypted() {
463                mark_encrypted().await;
464                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
465                // here is done.
466                break;
467            }
468        }
469    }
470
471    pub(crate) async fn reload_pinned_events(
472        &self,
473    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
474        let focus_guard = self.focus.read().await;
475
476        if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
477            loader.load_events().await
478        } else {
479            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
480        }
481    }
482
483    /// Run a lazy backwards pagination (in live mode).
484    ///
485    /// It adjusts the `count` value of the `Skip` higher-order stream so that
486    /// more items are pushed front in the timeline.
487    ///
488    /// If no more items are available (i.e. if the `count` is zero), this
489    /// method returns `Some(needs)` where `needs` is the number of events that
490    /// must be unlazily backwards paginated.
491    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
492        let state = self.state.read().await;
493
494        let (count, needs) = state
495            .meta
496            .subscriber_skip_count
497            .compute_next_when_paginating_backwards(num_events.into());
498
499        state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
500
501        needs
502    }
503
504    /// Run a backwards pagination (in focused mode) and append the results to
505    /// the timeline.
506    ///
507    /// Returns whether we hit the start of the timeline.
508    pub(super) async fn focused_paginate_backwards(
509        &self,
510        num_events: u16,
511    ) -> Result<bool, PaginationError> {
512        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
513            TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
514                return Err(PaginationError::NotSupported)
515            }
516            TimelineFocusData::Event { paginator, .. } => paginator
517                .paginate_backward(num_events.into())
518                .await
519                .map_err(PaginationError::Paginator)?,
520            TimelineFocusData::Thread { loader, num_events } => loader
521                .paginate_backwards((*num_events).into())
522                .await
523                .map_err(PaginationError::Paginator)?,
524        };
525
526        // Events are in reverse topological order.
527        // We can push front each event individually.
528        self.handle_remote_events_with_diffs(
529            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
530            RemoteEventOrigin::Pagination,
531        )
532        .await;
533
534        Ok(hit_end_of_timeline)
535    }
536
537    /// Run a forwards pagination (in focused mode) and append the results to
538    /// the timeline.
539    ///
540    /// Returns whether we hit the end of the timeline.
541    pub(super) async fn focused_paginate_forwards(
542        &self,
543        num_events: u16,
544    ) -> Result<bool, PaginationError> {
545        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
546            TimelineFocusData::Live
547            | TimelineFocusData::PinnedEvents { .. }
548            | TimelineFocusData::Thread { .. } => return Err(PaginationError::NotSupported),
549
550            TimelineFocusData::Event { paginator, .. } => paginator
551                .paginate_forward(num_events.into())
552                .await
553                .map_err(PaginationError::Paginator)?,
554        };
555
556        // Events are in topological order.
557        // We can append all events with no transformation.
558        self.handle_remote_events_with_diffs(
559            vec![VectorDiff::Append { values: events.into() }],
560            RemoteEventOrigin::Pagination,
561        )
562        .await;
563
564        Ok(hit_end_of_timeline)
565    }
566
567    /// Is this timeline receiving events from sync (aka has a live focus)?
568    pub(super) async fn is_live(&self) -> bool {
569        matches!(&*self.focus.read().await, TimelineFocusData::Live)
570    }
571
572    pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
573        self.settings = settings;
574        self
575    }
576
577    /// Get a copy of the current items in the list.
578    ///
579    /// Cheap because `im::Vector` is cheap to clone.
580    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
581        self.state.read().await.items.clone_items()
582    }
583
584    #[cfg(test)]
585    pub(super) async fn subscribe_raw(
586        &self,
587    ) -> (
588        Vector<Arc<TimelineItem>>,
589        impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
590    ) {
591        let state = self.state.read().await;
592
593        state.items.subscribe().into_values_and_stream()
594    }
595
596    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
597        let state = self.state.read().await;
598
599        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
600    }
601
602    pub(super) async fn subscribe_filter_map<U, F>(
603        &self,
604        f: F,
605    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
606    where
607        U: Clone,
608        F: Fn(Arc<TimelineItem>) -> Option<U>,
609    {
610        self.state.read().await.items.subscribe().filter_map(f)
611    }
612
613    /// Toggle a reaction locally.
614    ///
615    /// Returns true if the reaction was added, false if it was removed.
616    #[instrument(skip_all)]
617    pub(super) async fn toggle_reaction_local(
618        &self,
619        item_id: &TimelineEventItemId,
620        key: &str,
621    ) -> Result<bool, Error> {
622        let mut state = self.state.write().await;
623
624        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
625            warn!("Timeline item not found, can't add reaction");
626            return Err(Error::FailedToToggleReaction);
627        };
628
629        let user_id = self.room_data_provider.own_user_id();
630        let prev_status = item
631            .content()
632            .reactions()
633            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
634
635        let Some(prev_status) = prev_status else {
636            match &item.kind {
637                EventTimelineItemKind::Local(local) => {
638                    if let Some(send_handle) = &local.send_handle {
639                        if send_handle
640                            .react(key.to_owned())
641                            .await
642                            .map_err(|err| Error::SendQueueError(err.into()))?
643                            .is_some()
644                        {
645                            trace!("adding a reaction to a local echo");
646                            return Ok(true);
647                        }
648
649                        warn!("couldn't toggle reaction for local echo");
650                        return Ok(false);
651                    }
652
653                    warn!("missing send handle for local echo; is this a test?");
654                    return Ok(false);
655                }
656
657                EventTimelineItemKind::Remote(remote) => {
658                    // Add a reaction through the room data provider.
659                    // No need to reflect the effect locally, since the local echo handling will
660                    // take care of it.
661                    trace!("adding a reaction to a remote echo");
662                    let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
663                    self.room_data_provider
664                        .send(ReactionEventContent::from(annotation).into())
665                        .await?;
666                    return Ok(true);
667                }
668            }
669        };
670
671        trace!("removing a previous reaction");
672        match prev_status {
673            ReactionStatus::LocalToLocal(send_reaction_handle) => {
674                if let Some(handle) = send_reaction_handle {
675                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
676                        // Impossible state: the reaction has moved from local to echo under our
677                        // feet, but the timeline was supposed to be locked!
678                        warn!("unexpectedly unable to abort sending of local reaction");
679                    }
680                } else {
681                    warn!("no send reaction handle (this should only happen in testing contexts)");
682                }
683            }
684
685            ReactionStatus::LocalToRemote(send_handle) => {
686                // No need to reflect the change ourselves, since handling the discard of the
687                // local echo will take care of it.
688                trace!("aborting send of the previous reaction that was a local echo");
689                if let Some(handle) = send_handle {
690                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
691                        // Impossible state: the reaction has moved from local to echo under our
692                        // feet, but the timeline was supposed to be locked!
693                        warn!("unexpectedly unable to abort sending of local reaction");
694                    }
695                } else {
696                    warn!("no send handle (this should only happen in testing contexts)");
697                }
698            }
699
700            ReactionStatus::RemoteToRemote(event_id) => {
701                // Assume the redaction will work; we'll re-add the reaction if it didn't.
702                let Some(annotated_event_id) =
703                    item.as_remote().map(|event_item| event_item.event_id.clone())
704                else {
705                    warn!("remote reaction to remote event, but the associated item isn't remote");
706                    return Ok(false);
707                };
708
709                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
710                let reaction_info = reactions.remove_reaction(user_id, key);
711
712                if reaction_info.is_some() {
713                    let new_item = item.with_reactions(reactions);
714                    state.items.replace(item_pos, new_item);
715                } else {
716                    warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
717                }
718
719                // Release the lock before running the request.
720                drop(state);
721
722                trace!("sending redact for a previous reaction");
723                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
724                    if let Some(reaction_info) = reaction_info {
725                        debug!("sending redact failed, adding the reaction back to the list");
726
727                        let mut state = self.state.write().await;
728                        if let Some((item_pos, item)) =
729                            rfind_event_by_id(&state.items, &annotated_event_id)
730                        {
731                            // Re-add the reaction to the mapping.
732                            let mut reactions =
733                                item.content().reactions().cloned().unwrap_or_default();
734                            reactions
735                                .entry(key.to_owned())
736                                .or_default()
737                                .insert(user_id.to_owned(), reaction_info);
738                            let new_item = item.with_reactions(reactions);
739                            state.items.replace(item_pos, new_item);
740                        } else {
741                            warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
742                        }
743                    }
744
745                    return Err(err);
746                }
747            }
748        }
749
750        Ok(false)
751    }
752
753    /// Handle updates on events as [`VectorDiff`]s.
754    pub(super) async fn handle_remote_events_with_diffs(
755        &self,
756        diffs: Vec<VectorDiff<TimelineEvent>>,
757        origin: RemoteEventOrigin,
758    ) {
759        if diffs.is_empty() {
760            return;
761        }
762
763        let mut state = self.state.write().await;
764        state
765            .handle_remote_events_with_diffs(
766                diffs,
767                origin,
768                &self.room_data_provider,
769                &self.settings,
770            )
771            .await
772    }
773
774    /// Only handle aggregations received as [`VectorDiff`]s.
775    pub(super) async fn handle_remote_aggregations(
776        &self,
777        diffs: Vec<VectorDiff<TimelineEvent>>,
778        origin: RemoteEventOrigin,
779    ) {
780        if diffs.is_empty() {
781            return;
782        }
783
784        let mut state = self.state.write().await;
785        state
786            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
787            .await
788    }
789
790    pub(super) async fn clear(&self) {
791        self.state.write().await.clear();
792    }
793
794    /// Replaces the content of the current timeline with initial events.
795    ///
796    /// Also sets up read receipts and the read marker for a live timeline of a
797    /// room.
798    ///
799    /// This is all done with a single lock guard, since we don't want the state
800    /// to be modified between the clear and re-insertion of new events.
801    pub(super) async fn replace_with_initial_remote_events<Events>(
802        &self,
803        events: Events,
804        origin: RemoteEventOrigin,
805    ) where
806        Events: IntoIterator + ExactSizeIterator,
807        <Events as IntoIterator>::Item: Into<TimelineEvent>,
808    {
809        let mut state = self.state.write().await;
810
811        let track_read_markers = self.settings.track_read_receipts;
812        if track_read_markers {
813            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
814            state
815                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
816                .await;
817        }
818
819        // Replace the events if either the current event list or the new one aren't
820        // empty.
821        // Previously we just had to check the new one wasn't empty because
822        // we did a clear operation before so the current one would always be empty, but
823        // now we may want to replace a populated timeline with an empty one.
824        if !state.items.is_empty() || events.len() > 0 {
825            state
826                .replace_with_remote_events(
827                    events,
828                    origin,
829                    &self.room_data_provider,
830                    &self.settings,
831                )
832                .await;
833        }
834
835        if track_read_markers {
836            if let Some(fully_read_event_id) =
837                self.room_data_provider.load_fully_read_marker().await
838            {
839                state.handle_fully_read_marker(fully_read_event_id);
840            }
841        }
842    }
843
844    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
845        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
846    }
847
848    pub(super) async fn handle_ephemeral_events(
849        &self,
850        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
851    ) {
852        let mut state = self.state.write().await;
853        state.handle_ephemeral_events(events, &self.room_data_provider).await;
854    }
855
856    /// Creates the local echo for an event we're sending.
857    #[instrument(skip_all)]
858    pub(super) async fn handle_local_event(
859        &self,
860        txn_id: OwnedTransactionId,
861        content: AnyMessageLikeEventContent,
862        send_handle: Option<SendHandle>,
863    ) {
864        let sender = self.room_data_provider.own_user_id().to_owned();
865        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
866
867        let date_divider_mode = self.settings.date_divider_mode.clone();
868
869        let mut state = self.state.write().await;
870        state
871            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
872            .await;
873    }
874
875    /// Update the send state of a local event represented by a transaction ID.
876    ///
877    /// If the corresponding local timeline item is missing, a warning is
878    /// raised.
879    #[instrument(skip(self))]
880    pub(super) async fn update_event_send_state(
881        &self,
882        txn_id: &TransactionId,
883        send_state: EventSendState,
884    ) {
885        let mut state = self.state.write().await;
886        let mut txn = state.transaction();
887
888        let new_event_id: Option<&EventId> =
889            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
890
891        // The local echoes are always at the end of the timeline, we must first make
892        // sure the remote echo hasn't showed up yet.
893        if rfind_event_item(&txn.items, |it| {
894            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
895        })
896        .is_some()
897        {
898            // Remote echo already received. This is very unlikely.
899            trace!("Remote echo received before send-event response");
900
901            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
902
903            // If there's both the remote echo and a local echo, that means the
904            // remote echo was received before the response *and* contained no
905            // transaction ID (and thus duplicated the local echo).
906            if let Some((idx, _)) = local_echo {
907                warn!("Message echo got duplicated, removing the local one");
908                txn.items.remove(idx);
909
910                // Adjust the date dividers, if needs be.
911                let mut adjuster =
912                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
913                adjuster.run(&mut txn.items, &mut txn.meta);
914            }
915
916            txn.commit();
917            return;
918        }
919
920        // Look for the local event by the transaction ID or event ID.
921        let result = rfind_event_item(&txn.items, |it| {
922            it.transaction_id() == Some(txn_id)
923                || new_event_id.is_some()
924                    && it.event_id() == new_event_id
925                    && it.as_local().is_some()
926        });
927
928        let Some((idx, item)) = result else {
929            // Event wasn't found as a standalone item.
930            //
931            // If it was just sent, try to find if it matches a corresponding aggregation,
932            // and mark it as sent in that case.
933            if let Some(new_event_id) = new_event_id {
934                if txn.meta.aggregations.mark_aggregation_as_sent(
935                    txn_id.to_owned(),
936                    new_event_id.to_owned(),
937                    &mut txn.items,
938                    &txn.meta.room_version,
939                ) {
940                    trace!("Aggregation marked as sent");
941                    txn.commit();
942                    return;
943                }
944
945                trace!("Sent aggregation was not found");
946            }
947
948            warn!("Timeline item not found, can't update send state");
949            return;
950        };
951
952        let Some(local_item) = item.as_local() else {
953            warn!("We looked for a local item, but it transitioned to remote.");
954            return;
955        };
956
957        // The event was already marked as sent, that's a broken state, let's
958        // emit an error but also override to the given sent state.
959        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
960            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
961        }
962
963        // If the event has just been marked as sent, update the aggregations mapping to
964        // take that into account.
965        if let Some(new_event_id) = new_event_id {
966            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
967        }
968
969        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
970        txn.items.replace(idx, new_item);
971
972        txn.commit();
973    }
974
975    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
976        let mut state = self.state.write().await;
977
978        if let Some((idx, _)) =
979            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
980        {
981            let mut txn = state.transaction();
982
983            txn.items.remove(idx);
984
985            // A read marker or a date divider may have been inserted before the local echo.
986            // Ensure both are up to date.
987            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
988            adjuster.run(&mut txn.items, &mut txn.meta);
989
990            txn.meta.update_read_marker(&mut txn.items);
991
992            txn.commit();
993
994            debug!("discarded local echo");
995            return true;
996        }
997
998        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
999        // dereferencing it once.
1000        let mut txn = state.transaction();
1001
1002        // Look if this was a local aggregation.
1003        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1004            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1005            &mut txn.items,
1006        ) {
1007            Ok(val) => val,
1008            Err(err) => {
1009                warn!("error when discarding local echo for an aggregation: {err}");
1010                // The aggregation has been found, it's just that we couldn't discard it.
1011                true
1012            }
1013        };
1014
1015        if found_aggregation {
1016            txn.commit();
1017        }
1018
1019        found_aggregation
1020    }
1021
1022    pub(super) async fn replace_local_echo(
1023        &self,
1024        txn_id: &TransactionId,
1025        content: AnyMessageLikeEventContent,
1026    ) -> bool {
1027        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1028            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1029            // handling RoomMessage should be sufficient in most cases. Worst
1030            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1031            // editing the event then.
1032            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1033            return false;
1034        };
1035
1036        let mut state = self.state.write().await;
1037        let mut txn = state.transaction();
1038
1039        let Some((idx, prev_item)) =
1040            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1041        else {
1042            debug!("Can't find local echo to replace");
1043            return false;
1044        };
1045
1046        // Reuse the previous local echo's state, but reset the send state to not sent
1047        // (per API contract).
1048        let ti_kind = {
1049            let Some(prev_local_item) = prev_item.as_local() else {
1050                warn!("We looked for a local item, but it transitioned as remote??");
1051                return false;
1052            };
1053            prev_local_item.with_send_state(EventSendState::NotSentYet)
1054        };
1055
1056        // Replace the local-related state (kind) and the content state.
1057        let new_item = TimelineItem::new(
1058            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1059                content.msgtype,
1060                content.mentions,
1061                prev_item.content().reactions().cloned().unwrap_or_default(),
1062                prev_item.content().thread_root(),
1063                prev_item.content().in_reply_to(),
1064                prev_item.content().thread_summary(),
1065            )),
1066            prev_item.internal_id.to_owned(),
1067        );
1068
1069        txn.items.replace(idx, new_item);
1070
1071        // This doesn't change the original sending time, so there's no need to adjust
1072        // date dividers.
1073
1074        txn.commit();
1075
1076        debug!("Replaced local echo");
1077        true
1078    }
1079
1080    async fn retry_event_decryption_inner(
1081        &self,
1082        decryptor: D,
1083        session_ids: Option<BTreeSet<String>>,
1084    ) {
1085        self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
1086    }
1087
1088    pub(super) async fn set_sender_profiles_pending(&self) {
1089        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1090    }
1091
1092    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1093        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1094    }
1095
1096    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1097        self.state.write().await.items.for_each(|mut entry| {
1098            let Some(event_item) = entry.as_event() else { return };
1099            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1100                let new_item = entry.with_kind(TimelineItemKind::Event(
1101                    event_item.with_sender_profile(profile_state.clone()),
1102                ));
1103                ObservableItemsEntry::replace(&mut entry, new_item);
1104            }
1105        });
1106    }
1107
1108    pub(super) async fn update_missing_sender_profiles(&self) {
1109        trace!("Updating missing sender profiles");
1110
1111        let mut state = self.state.write().await;
1112        let mut entries = state.items.entries();
1113        while let Some(mut entry) = entries.next() {
1114            let Some(event_item) = entry.as_event() else { continue };
1115            let event_id = event_item.event_id().map(debug);
1116            let transaction_id = event_item.transaction_id().map(debug);
1117
1118            if event_item.sender_profile().is_ready() {
1119                trace!(event_id, transaction_id, "Profile already set");
1120                continue;
1121            }
1122
1123            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1124                Some(profile) => {
1125                    trace!(event_id, transaction_id, "Adding profile");
1126                    let updated_item =
1127                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1128                    let new_item = entry.with_kind(updated_item);
1129                    ObservableItemsEntry::replace(&mut entry, new_item);
1130                }
1131                None => {
1132                    if !event_item.sender_profile().is_unavailable() {
1133                        trace!(event_id, transaction_id, "Marking profile unavailable");
1134                        let updated_item =
1135                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1136                        let new_item = entry.with_kind(updated_item);
1137                        ObservableItemsEntry::replace(&mut entry, new_item);
1138                    } else {
1139                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1140                    }
1141                }
1142            }
1143        }
1144
1145        trace!("Done updating missing sender profiles");
1146    }
1147
1148    /// Update the profiles of the given senders, even if they are ready.
1149    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1150        trace!("Forcing update of sender profiles: {sender_ids:?}");
1151
1152        let mut state = self.state.write().await;
1153        let mut entries = state.items.entries();
1154        while let Some(mut entry) = entries.next() {
1155            let Some(event_item) = entry.as_event() else { continue };
1156            if !sender_ids.contains(event_item.sender()) {
1157                continue;
1158            }
1159
1160            let event_id = event_item.event_id().map(debug);
1161            let transaction_id = event_item.transaction_id().map(debug);
1162
1163            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1164                Some(profile) => {
1165                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1166                    {
1167                        debug!(event_id, transaction_id, "Profile already up-to-date");
1168                    } else {
1169                        trace!(event_id, transaction_id, "Updating profile");
1170                        let updated_item =
1171                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1172                        let new_item = entry.with_kind(updated_item);
1173                        ObservableItemsEntry::replace(&mut entry, new_item);
1174                    }
1175                }
1176                None => {
1177                    if !event_item.sender_profile().is_unavailable() {
1178                        trace!(event_id, transaction_id, "Marking profile unavailable");
1179                        let updated_item =
1180                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1181                        let new_item = entry.with_kind(updated_item);
1182                        ObservableItemsEntry::replace(&mut entry, new_item);
1183                    } else {
1184                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1185                    }
1186                }
1187            }
1188        }
1189
1190        trace!("Done forcing update of sender profiles");
1191    }
1192
1193    #[cfg(test)]
1194    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1195        let own_user_id = self.room_data_provider.own_user_id();
1196        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1197    }
1198
1199    /// Get the latest read receipt for the given user.
1200    ///
1201    /// Useful to get the latest read receipt, whether it's private or public.
1202    pub(super) async fn latest_user_read_receipt(
1203        &self,
1204        user_id: &UserId,
1205    ) -> Option<(OwnedEventId, Receipt)> {
1206        self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1207    }
1208
1209    /// Get the ID of the timeline event with the latest read receipt for the
1210    /// given user.
1211    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1212        &self,
1213        user_id: &UserId,
1214    ) -> Option<OwnedEventId> {
1215        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1216    }
1217
1218    /// Subscribe to changes in the read receipts of our own user.
1219    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1220        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1221    }
1222
1223    /// Handle a room send update that's a new local echo.
1224    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1225        match echo.content {
1226            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1227                let content = match serialized_event.deserialize() {
1228                    Ok(d) => d,
1229                    Err(err) => {
1230                        warn!("error deserializing local echo: {err}");
1231                        return;
1232                    }
1233                };
1234
1235                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1236                    .await;
1237
1238                if let Some(send_error) = send_error {
1239                    self.update_event_send_state(
1240                        &echo.transaction_id,
1241                        EventSendState::SendingFailed {
1242                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1243                                send_error,
1244                            ))),
1245                            is_recoverable: false,
1246                        },
1247                    )
1248                    .await;
1249                }
1250            }
1251
1252            LocalEchoContent::React { key, send_handle, applies_to } => {
1253                self.handle_local_reaction(key, send_handle, applies_to).await;
1254            }
1255        }
1256    }
1257
1258    /// Adds a reaction (local echo) to a local echo.
1259    #[instrument(skip(self, send_handle))]
1260    async fn handle_local_reaction(
1261        &self,
1262        reaction_key: String,
1263        send_handle: SendReactionHandle,
1264        applies_to: OwnedTransactionId,
1265    ) {
1266        let mut state = self.state.write().await;
1267        let mut tr = state.transaction();
1268
1269        let target = TimelineEventItemId::TransactionId(applies_to);
1270
1271        let reaction_txn_id = send_handle.transaction_id().to_owned();
1272        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1273        let aggregation = Aggregation::new(
1274            TimelineEventItemId::TransactionId(reaction_txn_id),
1275            AggregationKind::Reaction {
1276                key: reaction_key.clone(),
1277                sender: self.room_data_provider.own_user_id().to_owned(),
1278                timestamp: MilliSecondsSinceUnixEpoch::now(),
1279                reaction_status,
1280            },
1281        );
1282
1283        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1284        find_item_and_apply_aggregation(
1285            &tr.meta.aggregations,
1286            &mut tr.items,
1287            &target,
1288            aggregation,
1289            &tr.meta.room_version,
1290        );
1291
1292        tr.commit();
1293    }
1294
1295    /// Handle a single room send queue update.
1296    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1297        match update {
1298            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1299                self.handle_local_echo(echo).await;
1300            }
1301
1302            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1303                if !self.discard_local_echo(&transaction_id).await {
1304                    warn!("couldn't find the local echo to discard");
1305                }
1306            }
1307
1308            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1309                let content = match new_content.deserialize() {
1310                    Ok(d) => d,
1311                    Err(err) => {
1312                        warn!("error deserializing local echo (upon edit): {err}");
1313                        return;
1314                    }
1315                };
1316
1317                if !self.replace_local_echo(&transaction_id, content).await {
1318                    warn!("couldn't find the local echo to replace");
1319                }
1320            }
1321
1322            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1323                self.update_event_send_state(
1324                    &transaction_id,
1325                    EventSendState::SendingFailed { error, is_recoverable },
1326                )
1327                .await;
1328            }
1329
1330            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1331                self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1332            }
1333
1334            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1335                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1336                    .await;
1337            }
1338
1339            RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1340                // TODO(bnjbvr): Do something else?
1341                info!(txn_id = %related_to, "some media for a media event has been uploaded");
1342            }
1343        }
1344    }
1345
1346    /// Insert a timeline start item at the beginning of the room, if it's
1347    /// missing.
1348    pub async fn insert_timeline_start_if_missing(&self) {
1349        let mut state = self.state.write().await;
1350        let mut txn = state.transaction();
1351        txn.items.push_timeline_start_if_missing(
1352            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1353        );
1354        txn.commit();
1355    }
1356
1357    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1358    /// timeline or not.
1359    ///
1360    /// Can be `None` if the event cannot be represented as a standalone item,
1361    /// because it's an aggregation.
1362    pub(super) async fn make_replied_to(
1363        &self,
1364        event: TimelineEvent,
1365    ) -> Result<Option<EmbeddedEvent>, Error> {
1366        let state = self.state.read().await;
1367        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1368    }
1369}
1370
1371impl TimelineController {
1372    pub(super) fn room(&self) -> &Room {
1373        &self.room_data_provider
1374    }
1375
1376    /// Given an event identifier, will fetch the details for the event it's
1377    /// replying to, if applicable.
1378    #[instrument(skip(self))]
1379    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1380        let state_guard = self.state.write().await;
1381        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1382            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1383        let remote_item = item
1384            .as_remote()
1385            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1386            .clone();
1387
1388        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1389            debug!("Event is not a message");
1390            return Ok(());
1391        };
1392        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1393            debug!("Event is not a reply");
1394            return Ok(());
1395        };
1396        if let TimelineDetails::Pending = &in_reply_to.event {
1397            debug!("Replied-to event is already being fetched");
1398            return Ok(());
1399        }
1400        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1401            debug!("Replied-to event has already been fetched");
1402            return Ok(());
1403        }
1404
1405        let internal_id = item.internal_id.to_owned();
1406        let item = item.clone();
1407        let event = fetch_replied_to_event(
1408            state_guard,
1409            &self.state,
1410            index,
1411            &item,
1412            internal_id,
1413            &msglike,
1414            &in_reply_to.event_id,
1415            self.room(),
1416        )
1417        .await?;
1418
1419        // We need to be sure to have the latest position of the event as it might have
1420        // changed while waiting for the request.
1421        let mut state = self.state.write().await;
1422        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1423            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1424
1425        // Check the state of the event again, it might have been redacted while
1426        // the request was in-flight.
1427        let TimelineItemContent::MsgLike(MsgLikeContent {
1428            kind: MsgLikeKind::Message(message),
1429            reactions,
1430            thread_root,
1431            in_reply_to,
1432            thread_summary,
1433        }) = item.content().clone()
1434        else {
1435            info!("Event is no longer a message (redacted?)");
1436            return Ok(());
1437        };
1438        let Some(in_reply_to) = in_reply_to else {
1439            warn!("Event no longer has a reply (bug?)");
1440            return Ok(());
1441        };
1442
1443        // Now that we've received the content of the replied-to event, replace the
1444        // replied-to content in the item with it.
1445        trace!("Updating in-reply-to details");
1446        let internal_id = item.internal_id.to_owned();
1447        let mut item = item.clone();
1448        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1449            kind: MsgLikeKind::Message(message),
1450            reactions,
1451            thread_root,
1452            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1453            thread_summary,
1454        }));
1455        state.items.replace(index, TimelineItem::new(item, internal_id));
1456
1457        Ok(())
1458    }
1459
1460    /// Check whether the given receipt should be sent.
1461    ///
1462    /// Returns `false` if the given receipt is older than the current one.
1463    pub(super) async fn should_send_receipt(
1464        &self,
1465        receipt_type: &SendReceiptType,
1466        thread: &ReceiptThread,
1467        event_id: &EventId,
1468    ) -> bool {
1469        // We don't support threaded receipts yet.
1470        if *thread != ReceiptThread::Unthreaded {
1471            return true;
1472        }
1473
1474        let own_user_id = self.room().own_user_id();
1475        let state = self.state.read().await;
1476        let room = self.room();
1477
1478        match receipt_type {
1479            SendReceiptType::Read => {
1480                if let Some((old_pub_read, _)) = state
1481                    .meta
1482                    .user_receipt(
1483                        own_user_id,
1484                        ReceiptType::Read,
1485                        room,
1486                        state.items.all_remote_events(),
1487                    )
1488                    .await
1489                {
1490                    trace!(%old_pub_read, "found a previous public receipt");
1491                    if let Some(relative_pos) = state.meta.compare_events_positions(
1492                        &old_pub_read,
1493                        event_id,
1494                        state.items.all_remote_events(),
1495                    ) {
1496                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1497                        return relative_pos == RelativePosition::After;
1498                    }
1499                }
1500            }
1501            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1502            // doesn't make sense to have a private read receipt behind a public one.
1503            SendReceiptType::ReadPrivate => {
1504                if let Some((old_priv_read, _)) =
1505                    state.latest_user_read_receipt(own_user_id, room).await
1506                {
1507                    trace!(%old_priv_read, "found a previous private receipt");
1508                    if let Some(relative_pos) = state.meta.compare_events_positions(
1509                        &old_priv_read,
1510                        event_id,
1511                        state.items.all_remote_events(),
1512                    ) {
1513                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1514                        return relative_pos == RelativePosition::After;
1515                    }
1516                }
1517            }
1518            SendReceiptType::FullyRead => {
1519                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1520                {
1521                    if let Some(relative_pos) = state.meta.compare_events_positions(
1522                        &prev_event_id,
1523                        event_id,
1524                        state.items.all_remote_events(),
1525                    ) {
1526                        return relative_pos == RelativePosition::After;
1527                    }
1528                }
1529            }
1530            _ => {}
1531        }
1532
1533        // Let the server handle unknown receipts.
1534        true
1535    }
1536
1537    /// Returns the latest event identifier, even if it's not visible, or if
1538    /// it's folded into another timeline item.
1539    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1540        let state = self.state.read().await;
1541        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1542    }
1543
1544    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1545    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1546        self.retry_event_decryption_inner(self.room().clone(), session_ids).await
1547    }
1548}
1549
1550#[cfg(test)]
1551impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
1552    pub(super) async fn retry_event_decryption_test(
1553        &self,
1554        room_id: &RoomId,
1555        olm_machine: OlmMachine,
1556        session_ids: Option<BTreeSet<String>>,
1557    ) {
1558        self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1559    }
1560}
1561
1562#[allow(clippy::too_many_arguments)]
1563async fn fetch_replied_to_event(
1564    mut state_guard: RwLockWriteGuard<'_, TimelineState>,
1565    state_lock: &RwLock<TimelineState>,
1566    index: usize,
1567    item: &EventTimelineItem,
1568    internal_id: TimelineUniqueId,
1569    msglike: &MsgLikeContent,
1570    in_reply_to: &EventId,
1571    room: &Room,
1572) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1573    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1574        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1575        trace!("Found replied-to event locally");
1576        return Ok(details);
1577    };
1578
1579    // Replace the item with a new timeline item that has the fetching status of the
1580    // replied-to event to pending.
1581    trace!("Setting in-reply-to details to pending");
1582    let in_reply_to_details =
1583        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1584
1585    let event_item = item
1586        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1587
1588    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1589    state_guard.items.replace(index, new_timeline_item);
1590
1591    // Don't hold the state lock while the network request is made.
1592    drop(state_guard);
1593
1594    trace!("Fetching replied-to event");
1595    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1596        Ok(timeline_event) => {
1597            let state = state_lock.read().await;
1598
1599            let replied_to_item =
1600                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1601
1602            if let Some(item) = replied_to_item {
1603                TimelineDetails::Ready(Box::new(item))
1604            } else {
1605                // The replied-to item is an aggregation, not a standalone item.
1606                return Err(Error::UnsupportedEvent);
1607            }
1608        }
1609
1610        Err(e) => TimelineDetails::Error(Arc::new(e)),
1611    };
1612
1613    Ok(res)
1614}