Skip to main content

matrix_sdk_ui/timeline/controller/
state_transaction.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::{
20    ThreadSummary as SdkThreadSummary, ThreadSummaryStatus, TimelineEvent, TimelineEventKind,
21    UnsignedEventLocation,
22};
23use ruma::{
24    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
25    events::{
26        AnySyncTimelineEvent,
27        receipt::{ReceiptThread, ReceiptType},
28    },
29    push::Action,
30    serde::Raw,
31};
32use tracing::{debug, instrument, trace, warn};
33
34use super::{
35    super::{
36        controller::ObservableItemsTransactionEntry,
37        date_dividers::DateDividerAdjuster,
38        event_handler::{Flow, TimelineEventContext, TimelineEventHandler, TimelineItemPosition},
39        event_item::RemoteEventOrigin,
40        traits::RoomDataProvider,
41    },
42    ObservableItems, ObservableItemsTransaction, TimelineMetadata, TimelineReadReceiptTracking,
43    TimelineSettings,
44    metadata::EventMeta,
45};
46use crate::timeline::{
47    EmbeddedEvent, Profile, ThreadSummary, TimelineDetails, TimelineUniqueId, VirtualTimelineItem,
48    controller::TimelineFocusKind,
49    event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
50};
51
52pub(in crate::timeline) struct TimelineStateTransaction<'a, P: RoomDataProvider> {
53    /// A vector transaction over the items themselves. Holds temporary state
54    /// until committed.
55    pub items: ObservableItemsTransaction<'a>,
56
57    /// Number of items when the transaction has been created/has started.
58    number_of_items_when_transaction_started: usize,
59
60    /// A clone of the previous meta, that we're operating on during the
61    /// transaction, and that will be committed to the previous meta location in
62    /// [`Self::commit`].
63    pub meta: TimelineMetadata,
64
65    /// Pointer to the previous meta, only used during [`Self::commit`].
66    previous_meta: &'a mut TimelineMetadata,
67
68    /// The kind of focus of this timeline.
69    pub focus: &'a TimelineFocusKind,
70
71    /// Phantom data for type parameter.
72    _phantom: std::marker::PhantomData<P>,
73}
74
75impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
76    /// Create a new [`TimelineStateTransaction`].
77    pub(super) fn new(
78        items: &'a mut ObservableItems,
79        meta: &'a mut TimelineMetadata,
80        focus: &'a TimelineFocusKind,
81    ) -> Self {
82        let previous_meta = meta;
83        let meta = previous_meta.clone();
84        let items = items.transaction();
85
86        Self {
87            number_of_items_when_transaction_started: items.len(),
88            items,
89            previous_meta,
90            meta,
91            focus,
92            _phantom: std::marker::PhantomData,
93        }
94    }
95
96    /// Handle updates on events as [`VectorDiff`]s.
97    pub(super) async fn handle_remote_events_with_diffs(
98        &mut self,
99        diffs: Vec<VectorDiff<TimelineEvent>>,
100        origin: RemoteEventOrigin,
101        room_data_provider: &P,
102        settings: &TimelineSettings,
103    ) {
104        let mut date_divider_adjuster =
105            DateDividerAdjuster::new(settings.date_divider_mode.clone());
106
107        let mut cached_profiles: HashMap<OwnedUserId, Option<Profile>> = HashMap::new();
108
109        let mut recycled_timeline_ids = HashMap::new();
110
111        for diff in diffs {
112            match diff {
113                VectorDiff::Append { values: events } => {
114                    for event in events {
115                        let recycled_timeline_id = event
116                            .event_id()
117                            .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
118                        self.handle_remote_event(
119                            event,
120                            TimelineItemPosition::End { origin },
121                            room_data_provider,
122                            settings,
123                            &mut date_divider_adjuster,
124                            &mut cached_profiles,
125                            recycled_timeline_id,
126                        )
127                        .await;
128                    }
129                }
130
131                VectorDiff::PushFront { value: event } => {
132                    let recycled_timeline_id = event
133                        .event_id()
134                        .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
135                    self.handle_remote_event(
136                        event,
137                        TimelineItemPosition::Start { origin },
138                        room_data_provider,
139                        settings,
140                        &mut date_divider_adjuster,
141                        &mut cached_profiles,
142                        recycled_timeline_id,
143                    )
144                    .await;
145                }
146
147                VectorDiff::PushBack { value: event } => {
148                    let recycled_timeline_id = event
149                        .event_id()
150                        .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
151                    self.handle_remote_event(
152                        event,
153                        TimelineItemPosition::End { origin },
154                        room_data_provider,
155                        settings,
156                        &mut date_divider_adjuster,
157                        &mut cached_profiles,
158                        recycled_timeline_id,
159                    )
160                    .await;
161                }
162
163                VectorDiff::Insert { index: event_index, value: event } => {
164                    let recycled_timeline_id = event
165                        .event_id()
166                        .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
167                    self.handle_remote_event(
168                        event,
169                        TimelineItemPosition::At { event_index, origin },
170                        room_data_provider,
171                        settings,
172                        &mut date_divider_adjuster,
173                        &mut cached_profiles,
174                        recycled_timeline_id,
175                    )
176                    .await;
177                }
178
179                VectorDiff::Set { index: event_index, value: event } => {
180                    if let Some(timeline_item_index) = self
181                        .items
182                        .all_remote_events()
183                        .get(event_index)
184                        .and_then(|meta| meta.timeline_item_index)
185                    {
186                        self.handle_remote_event(
187                            event,
188                            TimelineItemPosition::UpdateAt { timeline_item_index },
189                            room_data_provider,
190                            settings,
191                            &mut date_divider_adjuster,
192                            &mut cached_profiles,
193                            None,
194                        )
195                        .await;
196                    } else {
197                        warn!(
198                            event_index,
199                            "Set update dropped because there wasn't any attached timeline item index."
200                        );
201                    }
202                }
203
204                VectorDiff::Remove { index: event_index } => {
205                    if let Some((timeline_id, event_id)) =
206                        self.remove_timeline_item(event_index, &mut date_divider_adjuster)
207                    {
208                        recycled_timeline_ids.insert(event_id, timeline_id);
209                    }
210                }
211
212                VectorDiff::Clear => {
213                    self.clear();
214                }
215
216                v => unimplemented!("{v:?}"),
217            }
218        }
219
220        self.adjust_date_dividers(date_divider_adjuster);
221        self.check_invariants();
222    }
223
224    async fn handle_remote_aggregation(
225        &mut self,
226        event: TimelineEvent,
227        position: TimelineItemPosition,
228        room_data_provider: &P,
229        date_divider_adjuster: &mut DateDividerAdjuster,
230    ) {
231        let raw_event = event.raw();
232
233        let deserialized = match raw_event.deserialize() {
234            Ok(deserialized) => deserialized,
235            Err(err) => {
236                warn!("Failed to deserialize timeline event: {err}");
237                return;
238            }
239        };
240
241        let sender = deserialized.sender().to_owned();
242        let timestamp = deserialized.origin_server_ts();
243        let event_id = deserialized.event_id().to_owned();
244        let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
245
246        let timeline_action = TimelineAction::from_event(
247            deserialized,
248            raw_event,
249            room_data_provider,
250            None,
251            None,
252            None,
253            None,
254        )
255        .await;
256
257        match timeline_action {
258            Some(action @ TimelineAction::AddItem { .. })
259            | Some(action @ TimelineAction::HandleAggregation { .. }) => {
260                let encryption_info = event.kind.encryption_info().cloned();
261                let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
262
263                let (forwarder, forwarder_profile) =
264                    get_forwarder_info(&event, room_data_provider).await;
265
266                let mut ctx = TimelineEventContext {
267                    sender,
268                    sender_profile,
269                    forwarder,
270                    forwarder_profile,
271                    timestamp,
272                    // These are not used when handling an aggregation.
273                    read_receipts: Default::default(),
274                    is_highlighted: false,
275                    flow: Flow::Remote {
276                        event_id: event_id.clone(),
277                        raw_event: event.raw().clone(),
278                        encryption_info,
279                        txn_id,
280                        position,
281                    },
282                    // This field is not used when handling an aggregation.
283                    should_add_new_items: false,
284                };
285
286                // FIXME: Continuation of the hackjob to get UTDs for focused timelines
287                // working from `handle_remote_aggregations()`.
288                if let TimelineAction::AddItem { .. } = action
289                    && let TimelineItemPosition::UpdateAt { timeline_item_index } = position
290                    && let Some(event) = self.items.get(timeline_item_index)
291                    && event
292                        .as_event()
293                        .map(|e| {
294                            e.content().is_unable_to_decrypt() && e.event_id() == Some(&event_id)
295                        })
296                        .unwrap_or_default()
297                {
298                    // Except when this is an UTD transitioning into a decrypted event.
299                    ctx.should_add_new_items = true;
300                }
301
302                TimelineEventHandler::new(self, ctx)
303                    .handle_event(date_divider_adjuster, action, None)
304                    .await;
305            }
306            None => {}
307        }
308    }
309
310    /// Handle a set of live remote aggregations on events as [`VectorDiff`]s.
311    ///
312    /// This is like `handle_remote_events`, with two key differences:
313    /// - it only applies to aggregated events, not all the sync events.
314    /// - it will also not add the events to the `all_remote_events` array
315    ///   itself.
316    pub(super) async fn handle_remote_aggregations(
317        &mut self,
318        diffs: Vec<VectorDiff<TimelineEvent>>,
319        origin: RemoteEventOrigin,
320        room_data_provider: &P,
321        settings: &TimelineSettings,
322    ) {
323        let mut date_divider_adjuster =
324            DateDividerAdjuster::new(settings.date_divider_mode.clone());
325
326        for diff in diffs {
327            match diff {
328                VectorDiff::Append { values: events } => {
329                    for event in events {
330                        self.handle_remote_aggregation(
331                            event,
332                            TimelineItemPosition::End { origin },
333                            room_data_provider,
334                            &mut date_divider_adjuster,
335                        )
336                        .await;
337                    }
338                }
339
340                VectorDiff::PushFront { value: event } => {
341                    self.handle_remote_aggregation(
342                        event,
343                        TimelineItemPosition::Start { origin },
344                        room_data_provider,
345                        &mut date_divider_adjuster,
346                    )
347                    .await;
348                }
349
350                VectorDiff::PushBack { value: event } => {
351                    self.handle_remote_aggregation(
352                        event,
353                        TimelineItemPosition::End { origin },
354                        room_data_provider,
355                        &mut date_divider_adjuster,
356                    )
357                    .await;
358                }
359
360                VectorDiff::Insert { index: event_index, value: event } => {
361                    self.handle_remote_aggregation(
362                        event,
363                        TimelineItemPosition::At { event_index, origin },
364                        room_data_provider,
365                        &mut date_divider_adjuster,
366                    )
367                    .await;
368                }
369
370                VectorDiff::Set { index: event_index, value: event } => {
371                    if let Some(timeline_item_index) = self
372                        .items
373                        .all_remote_events()
374                        .get(event_index)
375                        .and_then(|meta| meta.timeline_item_index)
376                    {
377                        self.handle_remote_aggregation(
378                            event,
379                            TimelineItemPosition::UpdateAt { timeline_item_index },
380                            room_data_provider,
381                            &mut date_divider_adjuster,
382                        )
383                        .await;
384                    } else if let Some(event_id) = event.event_id()
385                        && let Some(meta) =
386                            self.items.all_remote_events().get_by_event_id(&event_id)
387                        && let Some(timeline_item_index) = meta.timeline_item_index
388                    {
389                        // FIXME: This branch is a complete hackjob.
390                        //
391                        // The reason being is that this branch is here to handle UTD -> Decrypted
392                        // event remplacements for focused timelines. But this transition should
393                        // naturally happen the same way it happens for unfocused timelines.
394                        //
395                        // Why it doesn't work here? Because the event cache fires out a
396                        // VectorDiff::Set with an index that matches to the cache's view of the
397                        // timeline, which is unfiltered, while the focused timeline will only show
398                        // i.e. pinned events.
399                        //
400                        // The `test_pinned_events_are_decrypted_after_recovering` integration test
401                        // showcases this. The event cache fires out the `Set` with an index of 7,
402                        // but the timeline with the PinnedEvents focus has only 4 items.
403                        //
404                        // This hackjob continues in the `handle_remote_aggregation()` method as we
405                        // can't just handle any `TimelineAction::AddItem` due to:
406                        //  https://github.com/matrix-org/matrix-rust-sdk/pull/4645
407                        //
408                        // Doing so breaks the `test_new_pinned_events_are_not_added_on_sync` test.
409                        //
410                        // Relevant issue: https://github.com/matrix-org/matrix-rust-sdk/issues/5954.
411                        self.handle_remote_aggregation(
412                            event,
413                            TimelineItemPosition::UpdateAt { timeline_item_index },
414                            room_data_provider,
415                            &mut date_divider_adjuster,
416                        )
417                        .await;
418                    } else {
419                        warn!(
420                            event_index,
421                            "Set update dropped because there wasn't any attached timeline item index."
422                        );
423                    }
424                }
425
426                VectorDiff::Remove { .. } | VectorDiff::Clear => {
427                    // Do nothing. An aggregated redaction comes with a
428                    // redaction event, or as a redacted event in the first
429                    // place.
430                }
431
432                v => unimplemented!("{v:?}"),
433            }
434        }
435
436        self.adjust_date_dividers(date_divider_adjuster);
437        self.check_invariants();
438    }
439
440    fn check_invariants(&self) {
441        self.check_no_duplicate_read_receipts();
442        self.check_no_unused_unique_ids();
443    }
444
445    fn check_no_duplicate_read_receipts(&self) {
446        let mut by_user_id = HashMap::new();
447        let mut duplicates = HashSet::new();
448
449        for item in self.items.iter_remotes_region().filter_map(|(_, item)| item.as_event()) {
450            if let Some(event_id) = item.event_id() {
451                for (user_id, _read_receipt) in item.read_receipts() {
452                    if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
453                        duplicates.insert((user_id.clone(), prev_event_id, event_id));
454                    }
455                }
456            }
457        }
458
459        if !duplicates.is_empty() {
460            #[cfg(any(debug_assertions, test))]
461            panic!("duplicate read receipts in this timeline: {duplicates:?}\n{:?}", self.items);
462
463            #[cfg(not(any(debug_assertions, test)))]
464            tracing::error!(
465                ?duplicates,
466                items = ?self.items,
467                "duplicate read receipts in this timeline",
468            );
469        }
470    }
471
472    fn check_no_unused_unique_ids(&self) {
473        let duplicates = self
474            .items
475            .iter_all_regions()
476            .duplicates_by(|(_nth, item)| item.unique_id())
477            .map(|(_nth, item)| item.unique_id())
478            .collect::<Vec<_>>();
479
480        if !duplicates.is_empty() {
481            #[cfg(any(debug_assertions, test))]
482            panic!("duplicate unique ids in this timeline: {duplicates:?}\n{:?}", self.items);
483
484            #[cfg(not(any(debug_assertions, test)))]
485            tracing::error!(
486                ?duplicates,
487                items = ?self.items,
488                "duplicate unique ids in this timeline",
489            );
490        }
491    }
492
493    /// Whether the event should be added to the timeline as a new item.
494    fn should_add_event_item(
495        &self,
496        room_data_provider: &P,
497        settings: &TimelineSettings,
498        event: &AnySyncTimelineEvent,
499        thread_root: Option<&EventId>,
500        position: TimelineItemPosition,
501    ) -> bool {
502        let rules = room_data_provider.room_version_rules();
503
504        if !(settings.event_filter)(event, &rules) {
505            // The user filtered out the event.
506            return false;
507        }
508
509        match &self.focus {
510            TimelineFocusKind::PinnedEvents => {
511                // The pinned events timeline only receives updates for, well, pinned events.
512                true
513            }
514
515            TimelineFocusKind::Event { .. } => {
516                // For event-focused timelines, thread filtering is now handled in the
517                // event cache layer. We accept all events from pagination.
518
519                // Retrieve the origin of the event.
520                let origin = match position {
521                    TimelineItemPosition::End { origin }
522                    | TimelineItemPosition::Start { origin }
523                    | TimelineItemPosition::At { origin, .. } => origin,
524
525                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
526                        .items
527                        .get(idx)
528                        .and_then(|item| item.as_event()?.as_remote())
529                        .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
530                };
531
532                match origin {
533                    // Never add any item to a focused timeline when the item comes from sync.
534                    RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => false,
535                    RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => true,
536                }
537            }
538
539            TimelineFocusKind::Live { hide_threaded_events } => {
540                // If the timeline's filtering out in-thread events, don't add items for
541                // threaded events.
542                thread_root.is_none() || !hide_threaded_events
543            }
544
545            TimelineFocusKind::Thread { root_event_id, .. } => {
546                // Add new items only for the thread root and the thread replies.
547                event.event_id() == root_event_id
548                    || thread_root.as_ref().is_some_and(|r| r == root_event_id)
549            }
550        }
551    }
552
553    /// Whether this event can show read receipts, or if they should be moved
554    /// to the previous event.
555    fn can_show_read_receipts(
556        &self,
557        settings: &TimelineSettings,
558        event: &AnySyncTimelineEvent,
559    ) -> bool {
560        match event {
561            AnySyncTimelineEvent::State(_) => {
562                matches!(settings.track_read_receipts, TimelineReadReceiptTracking::AllEvents)
563            }
564            AnySyncTimelineEvent::MessageLike(_) => {
565                !matches!(settings.track_read_receipts, TimelineReadReceiptTracking::Disabled)
566            }
567        }
568    }
569
570    /// After a deserialization error, adds a failed-to-parse item to the
571    /// timeline if configured to do so, or logs the error (and optionally
572    /// save metadata) if not.
573    async fn maybe_add_error_item(
574        &mut self,
575        position: TimelineItemPosition,
576        room_data_provider: &P,
577        raw: &Raw<AnySyncTimelineEvent>,
578        deserialization_error: serde_json::Error,
579        settings: &TimelineSettings,
580    ) -> Option<(
581        OwnedEventId,
582        OwnedUserId,
583        MilliSecondsSinceUnixEpoch,
584        Option<OwnedTransactionId>,
585        Option<TimelineAction>,
586        Option<OwnedEventId>,
587        bool,
588        bool,
589    )> {
590        let state_key: Option<String> = raw.get_field("state_key").ok().flatten();
591
592        // A state event is an event that has a state key. Note that the two branches
593        // differ because the inferred return type for `get_field` is different
594        // in each case.
595        //
596        // If this was a state event but it didn't include a state_key, we'll assume it
597        // was a msg-like, because we can't do much more.
598        let event_type = if let Some(state_key) = state_key {
599            raw.get_field("type")
600                .ok()
601                .flatten()
602                .map(|event_type| FailedToParseEvent::State { event_type, state_key })
603        } else {
604            raw.get_field("type").ok().flatten().map(FailedToParseEvent::MsgLike)
605        };
606
607        let event_id: Option<OwnedEventId> = raw.get_field("event_id").ok().flatten();
608        let Some(event_id) = event_id else {
609            // If the event doesn't even have an event ID, we can't do anything with it.
610            warn!(
611                ?event_type,
612                "Failed to deserialize timeline event (with no ID): {deserialization_error}"
613            );
614            return None;
615        };
616
617        let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
618        let origin_server_ts: Option<MilliSecondsSinceUnixEpoch> =
619            raw.get_field("origin_server_ts").ok().flatten();
620
621        match (sender, origin_server_ts, event_type) {
622            (Some(sender), Some(origin_server_ts), Some(event_type))
623                if settings.add_failed_to_parse =>
624            {
625                // We have sufficient information to show an item in the timeline, and we've
626                // been requested to show it, let's do it.
627                #[derive(serde::Deserialize)]
628                struct Unsigned {
629                    transaction_id: Option<OwnedTransactionId>,
630                }
631
632                let transaction_id: Option<OwnedTransactionId> = raw
633                    .get_field::<Unsigned>("unsigned")
634                    .ok()
635                    .flatten()
636                    .and_then(|unsigned| unsigned.transaction_id);
637
638                // The event can be partially deserialized, and it is allowed to be added to
639                // the timeline.
640                Some((
641                    event_id,
642                    sender,
643                    origin_server_ts,
644                    transaction_id,
645                    Some(TimelineAction::failed_to_parse(event_type, deserialization_error)),
646                    None,
647                    true,
648                    true,
649                ))
650            }
651
652            (sender, origin_server_ts, event_type) => {
653                // We either lack information for rendering an item, or we've been requested not
654                // to show it. Save it into the metadata and return.
655                warn!(
656                    ?event_type,
657                    ?event_id,
658                    "Failed to deserialize timeline event: {deserialization_error}"
659                );
660
661                // Remember the event before returning prematurely.
662                // See [`ObservableItems::all_remote_events`].
663                self.add_or_update_remote_event(
664                    EventMeta::new(event_id, false, false, None),
665                    sender.as_deref(),
666                    origin_server_ts,
667                    position,
668                    room_data_provider,
669                    settings,
670                )
671                .await;
672                None
673            }
674        }
675    }
676
677    // Attempt to load a thread's latest reply as an embedded timeline item, either
678    // using the event cache or the storage.
679    #[instrument(skip(self, room_data_provider))]
680    async fn fetch_latest_thread_reply(
681        &mut self,
682        event_id: &EventId,
683        room_data_provider: &P,
684    ) -> Option<Box<EmbeddedEvent>> {
685        let event = RoomDataProvider::load_event(room_data_provider, event_id)
686            .await
687            .inspect_err(|err| {
688                warn!("Failed to load thread latest event: {err}");
689            })
690            .ok()?;
691
692        EmbeddedEvent::try_from_timeline_event(event, room_data_provider, &self.meta)
693            .await
694            .inspect_err(|err| {
695                warn!("Failed to extract thread latest event into a timeline item content: {err}");
696            })
697            .ok()
698            .flatten()
699            .map(Box::new)
700    }
701
702    /// Compute the thread public and private receipts, for the sake of the
703    /// [`ThreadSummary`] of an event.
704    async fn compute_summary_thread_receipts(
705        &self,
706        event: &TimelineEvent,
707        summary: &SdkThreadSummary,
708        room_data_provider: &P,
709        settings: &TimelineSettings,
710    ) -> (Option<OwnedEventId>, Option<OwnedEventId>) {
711        if !settings.track_read_receipts.is_enabled() {
712            return (None, None);
713        }
714
715        // Load the public and private read receipts for the user, in the thread. In the
716        // future, we might move this code in the event cache, so that read
717        // receipt handling happens there instead.
718
719        // As an exception to handle the latest "implicit" read receipt (which is the
720        // latest event sent by the user): if the latest event has been sent by
721        // the current user, then we consider that as a read receipt.
722        #[allow(clippy::collapsible_if)] // clippy has poor taste
723        if let Some(ref latest_reply) = summary.latest_reply {
724            if let Ok(event) = RoomDataProvider::load_event(room_data_provider, latest_reply)
725                .await
726                .inspect_err(|err| {
727                    warn!("Failed to load thread latest event: {err}");
728                })
729            {
730                // Parse the sender.
731                if let Some(sender) = event.sender()
732                    && sender == self.meta.own_user_id
733                {
734                    let latest = Some(latest_reply.clone());
735                    return (latest.clone(), latest);
736                }
737            }
738        }
739
740        // Otherwise, resort to trying to load receipts from the database.
741        let own_thread_public_receipt = if let Some(event_id) = event.event_id() {
742            room_data_provider
743                .load_user_receipt(
744                    ReceiptType::Read,
745                    ReceiptThread::Thread(event_id),
746                    &self.meta.own_user_id,
747                )
748                .await
749                .map(|(event_id, _receipt)| event_id)
750        } else {
751            None
752        };
753
754        let own_thread_private_receipt = if let Some(event_id) = event.event_id() {
755            room_data_provider
756                .load_user_receipt(
757                    ReceiptType::ReadPrivate,
758                    ReceiptThread::Thread(event_id),
759                    &self.meta.own_user_id,
760                )
761                .await
762                .map(|(event_id, _receipt)| event_id)
763        } else {
764            None
765        };
766
767        (own_thread_public_receipt, own_thread_private_receipt)
768    }
769
770    /// Handle a remote event.
771    ///
772    /// Returns whether an item has been removed from the timeline.
773    #[allow(clippy::too_many_arguments)]
774    pub(super) async fn handle_remote_event(
775        &mut self,
776        event: TimelineEvent,
777        position: TimelineItemPosition,
778        room_data_provider: &P,
779        settings: &TimelineSettings,
780        date_divider_adjuster: &mut DateDividerAdjuster,
781        profiles: &mut HashMap<OwnedUserId, Option<Profile>>,
782        recycled_timeline_id: Option<TimelineUniqueId>,
783    ) -> RemovedItem {
784        let is_highlighted =
785            event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight));
786
787        let thread_summary = if let ThreadSummaryStatus::Some(ref summary) = event.thread_summary {
788            let latest_reply_item = if let Some(ref latest_reply) = summary.latest_reply {
789                self.fetch_latest_thread_reply(latest_reply, room_data_provider).await
790            } else {
791                None
792            };
793
794            let (own_thread_public_receipt, own_thread_private_receipt) = self
795                .compute_summary_thread_receipts(&event, summary, room_data_provider, settings)
796                .await;
797
798            Some(ThreadSummary {
799                latest_event: TimelineDetails::from_initial_value(latest_reply_item),
800                num_replies: summary.num_replies,
801                public_read_receipt_event_id: own_thread_public_receipt,
802                private_read_receipt_event_id: own_thread_private_receipt,
803            })
804        } else {
805            None
806        };
807
808        let encryption_info = event.kind.encryption_info().cloned();
809
810        let bundled_edit_encryption_info = event.kind.unsigned_encryption_map().and_then(|map| {
811            map.get(&UnsignedEventLocation::RelationsReplace)?.encryption_info().cloned()
812        });
813
814        let (forwarder, forwarder_profile) = get_forwarder_info(&event, room_data_provider).await;
815
816        let (raw, utd_info) = match event.kind {
817            TimelineEventKind::UnableToDecrypt { utd_info, event } => (event, Some(utd_info)),
818            _ => (event.kind.into_raw(), None),
819        };
820
821        let (
822            event_id,
823            sender,
824            timestamp,
825            txn_id,
826            timeline_action,
827            thread_root,
828            should_add,
829            can_show_read_receipts,
830        ) = match raw.deserialize() {
831            // Classical path: the event is valid, can be deserialized, everything is alright.
832            Ok(event) => {
833                let (in_reply_to, thread_root) = self.meta.process_event_relations(
834                    &event,
835                    &raw,
836                    bundled_edit_encryption_info,
837                    &self.items,
838                    self.focus.is_thread(),
839                );
840
841                let should_add = self.should_add_event_item(
842                    room_data_provider,
843                    settings,
844                    &event,
845                    thread_root.as_deref(),
846                    position,
847                );
848
849                let can_show_read_receipts = self.can_show_read_receipts(settings, &event);
850
851                (
852                    event.event_id().to_owned(),
853                    event.sender().to_owned(),
854                    event.origin_server_ts(),
855                    event.transaction_id().map(ToOwned::to_owned),
856                    TimelineAction::from_event(
857                        event,
858                        &raw,
859                        room_data_provider,
860                        utd_info
861                            .map(|utd_info| (utd_info, self.meta.unable_to_decrypt_hook.as_ref())),
862                        in_reply_to,
863                        thread_root.clone(),
864                        thread_summary,
865                    )
866                    .await,
867                    thread_root,
868                    should_add,
869                    can_show_read_receipts,
870                )
871            }
872
873            // The event seems invalid…
874            Err(e) => {
875                if let Some(tuple) =
876                    self.maybe_add_error_item(position, room_data_provider, &raw, e, settings).await
877                {
878                    tuple
879                } else {
880                    return false;
881                }
882            }
883        };
884
885        // Remember the event.
886        // See [`ObservableItems::all_remote_events`].
887        self.add_or_update_remote_event(
888            EventMeta::new(event_id.clone(), should_add, can_show_read_receipts, thread_root),
889            Some(&sender),
890            Some(timestamp),
891            position,
892            room_data_provider,
893            settings,
894        )
895        .await;
896
897        // Handle the event to create or update a timeline item.
898        let item_added = if let Some(timeline_action) = timeline_action {
899            let sender_profile = if let Some(profile) = profiles.get(&sender) {
900                profile.clone()
901            } else {
902                let profile = room_data_provider.profile_from_user_id(&sender).await;
903                profiles.insert(sender.clone(), profile.clone());
904                profile
905            };
906
907            let ctx = TimelineEventContext {
908                sender,
909                sender_profile,
910                forwarder,
911                forwarder_profile,
912                timestamp,
913                read_receipts: if settings.track_read_receipts.is_enabled()
914                    && should_add
915                    && can_show_read_receipts
916                {
917                    self.meta.read_receipts.compute_event_receipts(
918                        &event_id,
919                        &mut self.items,
920                        matches!(position, TimelineItemPosition::End { .. }),
921                    )
922                } else {
923                    Default::default()
924                },
925                is_highlighted,
926                flow: Flow::Remote {
927                    event_id: event_id.clone(),
928                    raw_event: raw,
929                    encryption_info,
930                    txn_id,
931                    position,
932                },
933                should_add_new_items: should_add,
934            };
935
936            TimelineEventHandler::new(self, ctx)
937                .handle_event(date_divider_adjuster, timeline_action, recycled_timeline_id)
938                .await
939        } else {
940            // No item has been added to the timeline.
941            false
942        };
943
944        let mut item_removed = false;
945
946        if !item_added {
947            trace!("No new item added");
948
949            if let TimelineItemPosition::UpdateAt { timeline_item_index } = position {
950                // If add was not called, that means the UTD event is one that
951                // wouldn't normally be visible. Remove it.
952                trace!("Removing UTD that was successfully retried");
953                self.items.remove(timeline_item_index);
954                item_removed = true;
955            }
956        }
957
958        item_removed
959    }
960
961    /// Remove one timeline item by its `event_index`.
962    fn remove_timeline_item(
963        &mut self,
964        event_index: usize,
965        day_divider_adjuster: &mut DateDividerAdjuster,
966    ) -> Option<(TimelineUniqueId, OwnedEventId)> {
967        day_divider_adjuster.mark_used();
968
969        // We need to be careful here.
970        //
971        // We must first remove the timeline item, which will update the mapping between
972        // remote events and timeline items. Removing the timeline item will “unlink”
973        // this mapping as the remote event will be updated to map to nothing. Only
974        // after that, we can remove the remote event. Doing this in the other order
975        // will update the mapping twice, and will result in a corrupted state.
976
977        let mut recycled_timeline_id = None;
978
979        // Remove the timeline item first.
980        if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
981            // Fetch the `timeline_item_index` associated to the remote event.
982            if let Some(timeline_item_index) = event_meta.timeline_item_index {
983                let event_id = event_meta.event_id.clone();
984                let timeline_item = self.items.remove(timeline_item_index);
985                recycled_timeline_id = Some((timeline_item.unique_id().clone(), event_id));
986            }
987
988            // Now we can remove the remote event.
989            self.items.remove_remote_event(event_index);
990        }
991
992        recycled_timeline_id
993    }
994
995    pub(super) fn clear(&mut self) {
996        // By first checking if there are any local echoes first, we do a bit
997        // more work in case some are found, but it should be worth it because
998        // there will often not be any, and only emitting a single
999        // `VectorDiff::Clear` should be much more efficient to process for
1000        // subscribers.
1001        if self.items.has_local() {
1002            // Remove all remote events and virtual items that aren't date dividers.
1003            self.items.for_each(|entry| {
1004                if entry.is_remote_event()
1005                    || entry.as_virtual().is_some_and(|vitem| match vitem {
1006                        VirtualTimelineItem::DateDivider(_) => false,
1007                        VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
1008                            true
1009                        }
1010                    })
1011                {
1012                    ObservableItemsTransactionEntry::remove(entry);
1013                }
1014            });
1015
1016            // Remove stray date dividers
1017            let mut idx = 0;
1018            while idx < self.items.len() {
1019                if self.items[idx].is_date_divider()
1020                    && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
1021                {
1022                    self.items.remove(idx);
1023                    // don't increment idx because all elements have shifted
1024                } else {
1025                    idx += 1;
1026                }
1027            }
1028        } else {
1029            self.items.clear();
1030        }
1031
1032        self.meta.clear();
1033
1034        debug!(remaining_items = self.items.len(), "Timeline cleared");
1035    }
1036
1037    #[instrument(skip_all)]
1038    pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
1039        // A similar event has been handled already. We can ignore it.
1040        if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
1041            return;
1042        }
1043
1044        self.meta.fully_read_event = Some(fully_read_event_id);
1045        self.meta.update_read_marker(&mut self.items);
1046    }
1047
1048    pub(super) fn commit(self) {
1049        // Update the `subscriber_skip_count` value.
1050        let previous_number_of_items = self.number_of_items_when_transaction_started;
1051        let next_number_of_items = self.items.len();
1052
1053        if previous_number_of_items != next_number_of_items {
1054            let count = self
1055                .meta
1056                .subscriber_skip_count
1057                .compute_next(previous_number_of_items, next_number_of_items);
1058            self.meta
1059                .subscriber_skip_count
1060                .update(count, matches!(self.focus, TimelineFocusKind::Live { .. }));
1061        }
1062
1063        // Replace the pointer to the previous meta with the new one.
1064        *self.previous_meta = self.meta;
1065
1066        self.items.commit();
1067    }
1068
1069    /// Add or update a remote event in the
1070    /// [`ObservableItems::all_remote_events`] collection.
1071    ///
1072    /// This method also adjusts read receipt if needed.
1073    async fn add_or_update_remote_event(
1074        &mut self,
1075        event_meta: EventMeta,
1076        sender: Option<&UserId>,
1077        timestamp: Option<MilliSecondsSinceUnixEpoch>,
1078        position: TimelineItemPosition,
1079        room_data_provider: &P,
1080        settings: &TimelineSettings,
1081    ) {
1082        let event_id = event_meta.event_id.clone();
1083
1084        match position {
1085            TimelineItemPosition::Start { .. } => self.items.push_front_remote_event(event_meta),
1086
1087            TimelineItemPosition::End { .. } => {
1088                self.items.push_back_remote_event(event_meta);
1089            }
1090
1091            TimelineItemPosition::At { event_index, .. } => {
1092                self.items.insert_remote_event(event_index, event_meta);
1093            }
1094
1095            TimelineItemPosition::UpdateAt { .. } => {
1096                if let Some(event) =
1097                    self.items.get_remote_event_by_event_id_mut(&event_meta.event_id)
1098                    && (event.visible != event_meta.visible
1099                        || event.can_show_read_receipts != event_meta.can_show_read_receipts)
1100                {
1101                    event.visible = event_meta.visible;
1102                    event.can_show_read_receipts = event_meta.can_show_read_receipts;
1103
1104                    if settings.track_read_receipts.is_enabled() {
1105                        // Since the event's visibility changed, we need to update the read
1106                        // receipts of the previous visible event.
1107                        self.maybe_update_read_receipts_of_prev_event(&event_meta.event_id);
1108                    }
1109                }
1110            }
1111        }
1112
1113        if settings.track_read_receipts.is_enabled()
1114            && matches!(
1115                position,
1116                TimelineItemPosition::Start { .. }
1117                    | TimelineItemPosition::End { .. }
1118                    | TimelineItemPosition::At { .. }
1119            )
1120        {
1121            self.load_read_receipts_for_event(&event_id, room_data_provider).await;
1122
1123            self.maybe_add_implicit_read_receipt(&event_id, sender, timestamp);
1124        }
1125    }
1126
1127    pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
1128        adjuster.run(&mut self.items, &mut self.meta);
1129    }
1130
1131    /// This method replaces the `is_room_encrypted` value for all timeline
1132    /// items to its updated version and creates a `VectorDiff::Set` operation
1133    /// for each item which will be added to this transaction.
1134    pub(super) fn mark_all_events_as_encrypted(&mut self) {
1135        for idx in 0..self.items.len() {
1136            let item = &self.items[idx];
1137
1138            if let Some(event) = item.as_event() {
1139                if event.is_room_encrypted {
1140                    continue;
1141                }
1142
1143                let mut cloned_event = event.clone();
1144                cloned_event.is_room_encrypted = true;
1145
1146                // Replace the existing item with a new version with the right encryption flag
1147                let item = item.with_kind(cloned_event);
1148                self.items.replace(idx, item);
1149            }
1150        }
1151    }
1152}
1153
1154/// Retrieves the forwarder information for a given timeline event.
1155///
1156/// # Parameters
1157///
1158/// - `event`: The timeline event to extract forwarder information from.
1159/// - `room_data_provider`: A reference to the room data provider.
1160///
1161/// # Returns
1162///
1163/// A tuple containing:
1164/// - `Option<OwnedUserId>`: The user ID of the forwarder, if available.
1165/// - `Option<Profile>`: The profile of the forwarder, if available.
1166async fn get_forwarder_info<P: RoomDataProvider>(
1167    event: &TimelineEvent,
1168    room_data_provider: &P,
1169) -> (Option<OwnedUserId>, Option<Profile>) {
1170    let forwarder = event
1171        .kind
1172        .encryption_info()
1173        .and_then(|info| info.forwarder.as_ref())
1174        .map(|info| info.user_id.clone());
1175
1176    let forwarder_profile = if let Some(ref forwarder_id) = forwarder {
1177        Some(room_data_provider.profile_from_user_id(forwarder_id).await)
1178    } else {
1179        None
1180    };
1181
1182    (forwarder, forwarder_profile.flatten())
1183}