matrix_sdk_ui/timeline/controller/
state_transaction.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::{
20    ThreadSummaryStatus, TimelineEvent, TimelineEventKind, UnsignedEventLocation,
21};
22use ruma::{
23    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
24    events::AnySyncTimelineEvent, push::Action, serde::Raw,
25};
26use tracing::{debug, instrument, trace, warn};
27
28use super::{
29    super::{
30        controller::ObservableItemsTransactionEntry,
31        date_dividers::DateDividerAdjuster,
32        event_handler::{Flow, TimelineEventContext, TimelineEventHandler, TimelineItemPosition},
33        event_item::RemoteEventOrigin,
34        traits::RoomDataProvider,
35    },
36    ObservableItems, ObservableItemsTransaction, TimelineMetadata, TimelineSettings,
37    metadata::EventMeta,
38};
39use crate::timeline::{
40    EmbeddedEvent, ThreadSummary, TimelineDetails, VirtualTimelineItem,
41    controller::TimelineFocusKind,
42    event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
43};
44
45pub(in crate::timeline) struct TimelineStateTransaction<'a, P: RoomDataProvider> {
46    /// A vector transaction over the items themselves. Holds temporary state
47    /// until committed.
48    pub items: ObservableItemsTransaction<'a>,
49
50    /// Number of items when the transaction has been created/has started.
51    number_of_items_when_transaction_started: usize,
52
53    /// A clone of the previous meta, that we're operating on during the
54    /// transaction, and that will be committed to the previous meta location in
55    /// [`Self::commit`].
56    pub meta: TimelineMetadata,
57
58    /// Pointer to the previous meta, only used during [`Self::commit`].
59    previous_meta: &'a mut TimelineMetadata,
60
61    /// The kind of focus of this timeline.
62    pub focus: &'a TimelineFocusKind<P>,
63}
64
65impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
66    /// Create a new [`TimelineStateTransaction`].
67    pub(super) fn new(
68        items: &'a mut ObservableItems,
69        meta: &'a mut TimelineMetadata,
70        focus: &'a TimelineFocusKind<P>,
71    ) -> Self {
72        let previous_meta = meta;
73        let meta = previous_meta.clone();
74        let items = items.transaction();
75
76        Self {
77            number_of_items_when_transaction_started: items.len(),
78            items,
79            previous_meta,
80            meta,
81            focus,
82        }
83    }
84
85    /// Handle updates on events as [`VectorDiff`]s.
86    pub(super) async fn handle_remote_events_with_diffs(
87        &mut self,
88        diffs: Vec<VectorDiff<TimelineEvent>>,
89        origin: RemoteEventOrigin,
90        room_data_provider: &P,
91        settings: &TimelineSettings,
92    ) {
93        let mut date_divider_adjuster =
94            DateDividerAdjuster::new(settings.date_divider_mode.clone());
95
96        for diff in diffs {
97            match diff {
98                VectorDiff::Append { values: events } => {
99                    for event in events {
100                        self.handle_remote_event(
101                            event,
102                            TimelineItemPosition::End { origin },
103                            room_data_provider,
104                            settings,
105                            &mut date_divider_adjuster,
106                        )
107                        .await;
108                    }
109                }
110
111                VectorDiff::PushFront { value: event } => {
112                    self.handle_remote_event(
113                        event,
114                        TimelineItemPosition::Start { origin },
115                        room_data_provider,
116                        settings,
117                        &mut date_divider_adjuster,
118                    )
119                    .await;
120                }
121
122                VectorDiff::PushBack { value: event } => {
123                    self.handle_remote_event(
124                        event,
125                        TimelineItemPosition::End { origin },
126                        room_data_provider,
127                        settings,
128                        &mut date_divider_adjuster,
129                    )
130                    .await;
131                }
132
133                VectorDiff::Insert { index: event_index, value: event } => {
134                    self.handle_remote_event(
135                        event,
136                        TimelineItemPosition::At { event_index, origin },
137                        room_data_provider,
138                        settings,
139                        &mut date_divider_adjuster,
140                    )
141                    .await;
142                }
143
144                VectorDiff::Set { index: event_index, value: event } => {
145                    if let Some(timeline_item_index) = self
146                        .items
147                        .all_remote_events()
148                        .get(event_index)
149                        .and_then(|meta| meta.timeline_item_index)
150                    {
151                        self.handle_remote_event(
152                            event,
153                            TimelineItemPosition::UpdateAt { timeline_item_index },
154                            room_data_provider,
155                            settings,
156                            &mut date_divider_adjuster,
157                        )
158                        .await;
159                    } else {
160                        warn!(
161                            event_index,
162                            "Set update dropped because there wasn't any attached timeline item index."
163                        );
164                    }
165                }
166
167                VectorDiff::Remove { index: event_index } => {
168                    self.remove_timeline_item(event_index, &mut date_divider_adjuster);
169                }
170
171                VectorDiff::Clear => {
172                    self.clear();
173                }
174
175                v => unimplemented!("{v:?}"),
176            }
177        }
178
179        self.adjust_date_dividers(date_divider_adjuster);
180        self.check_invariants();
181    }
182
183    async fn handle_remote_aggregation(
184        &mut self,
185        event: TimelineEvent,
186        position: TimelineItemPosition,
187        room_data_provider: &P,
188        date_divider_adjuster: &mut DateDividerAdjuster,
189    ) {
190        let raw_event = event.raw();
191
192        let deserialized = match raw_event.deserialize() {
193            Ok(deserialized) => deserialized,
194            Err(err) => {
195                warn!("Failed to deserialize timeline event: {err}");
196                return;
197            }
198        };
199
200        let sender = deserialized.sender().to_owned();
201        let timestamp = deserialized.origin_server_ts();
202        let event_id = deserialized.event_id().to_owned();
203        let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
204
205        if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
206            deserialized,
207            raw_event,
208            room_data_provider,
209            None,
210            None,
211            None,
212            None,
213        )
214        .await
215        {
216            let encryption_info = event.kind.encryption_info().cloned();
217
218            let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
219
220            let ctx = TimelineEventContext {
221                sender,
222                sender_profile,
223                timestamp,
224                // These are not used when handling an aggregation.
225                read_receipts: Default::default(),
226                is_highlighted: false,
227                flow: Flow::Remote {
228                    event_id: event_id.clone(),
229                    raw_event: event.raw().clone(),
230                    encryption_info,
231                    txn_id,
232                    position,
233                },
234                // This field is not used when handling an aggregation.
235                should_add_new_items: false,
236            };
237
238            TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
239        }
240    }
241
242    /// Handle a set of live remote aggregations on events as [`VectorDiff`]s.
243    ///
244    /// This is like `handle_remote_events`, with two key differences:
245    /// - it only applies to aggregated events, not all the sync events.
246    /// - it will also not add the events to the `all_remote_events` array
247    ///   itself.
248    pub(super) async fn handle_remote_aggregations(
249        &mut self,
250        diffs: Vec<VectorDiff<TimelineEvent>>,
251        origin: RemoteEventOrigin,
252        room_data_provider: &P,
253        settings: &TimelineSettings,
254    ) {
255        let mut date_divider_adjuster =
256            DateDividerAdjuster::new(settings.date_divider_mode.clone());
257
258        for diff in diffs {
259            match diff {
260                VectorDiff::Append { values: events } => {
261                    for event in events {
262                        self.handle_remote_aggregation(
263                            event,
264                            TimelineItemPosition::End { origin },
265                            room_data_provider,
266                            &mut date_divider_adjuster,
267                        )
268                        .await;
269                    }
270                }
271
272                VectorDiff::PushFront { value: event } => {
273                    self.handle_remote_aggregation(
274                        event,
275                        TimelineItemPosition::Start { origin },
276                        room_data_provider,
277                        &mut date_divider_adjuster,
278                    )
279                    .await;
280                }
281
282                VectorDiff::PushBack { value: event } => {
283                    self.handle_remote_aggregation(
284                        event,
285                        TimelineItemPosition::End { origin },
286                        room_data_provider,
287                        &mut date_divider_adjuster,
288                    )
289                    .await;
290                }
291
292                VectorDiff::Insert { index: event_index, value: event } => {
293                    self.handle_remote_aggregation(
294                        event,
295                        TimelineItemPosition::At { event_index, origin },
296                        room_data_provider,
297                        &mut date_divider_adjuster,
298                    )
299                    .await;
300                }
301
302                VectorDiff::Set { index: event_index, value: event } => {
303                    if let Some(timeline_item_index) = self
304                        .items
305                        .all_remote_events()
306                        .get(event_index)
307                        .and_then(|meta| meta.timeline_item_index)
308                    {
309                        self.handle_remote_aggregation(
310                            event,
311                            TimelineItemPosition::UpdateAt { timeline_item_index },
312                            room_data_provider,
313                            &mut date_divider_adjuster,
314                        )
315                        .await;
316                    } else {
317                        warn!(
318                            event_index,
319                            "Set update dropped because there wasn't any attached timeline item index."
320                        );
321                    }
322                }
323
324                VectorDiff::Remove { .. } | VectorDiff::Clear => {
325                    // Do nothing. An aggregated redaction comes with a
326                    // redaction event, or as a redacted event in the first
327                    // place.
328                }
329
330                v => unimplemented!("{v:?}"),
331            }
332        }
333
334        self.adjust_date_dividers(date_divider_adjuster);
335        self.check_invariants();
336    }
337
338    fn check_invariants(&self) {
339        self.check_no_duplicate_read_receipts();
340        self.check_no_unused_unique_ids();
341    }
342
343    fn check_no_duplicate_read_receipts(&self) {
344        let mut by_user_id = HashMap::new();
345        let mut duplicates = HashSet::new();
346
347        for item in self.items.iter_remotes_region().filter_map(|(_, item)| item.as_event()) {
348            if let Some(event_id) = item.event_id() {
349                for (user_id, _read_receipt) in item.read_receipts() {
350                    if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
351                        duplicates.insert((user_id.clone(), prev_event_id, event_id));
352                    }
353                }
354            }
355        }
356
357        if !duplicates.is_empty() {
358            #[cfg(any(debug_assertions, test))]
359            panic!("duplicate read receipts in this timeline: {duplicates:?}\n{:?}", self.items);
360
361            #[cfg(not(any(debug_assertions, test)))]
362            tracing::error!(
363                ?duplicates,
364                items = ?self.items,
365                "duplicate read receipts in this timeline",
366            );
367        }
368    }
369
370    fn check_no_unused_unique_ids(&self) {
371        let duplicates = self
372            .items
373            .iter_all_regions()
374            .duplicates_by(|(_nth, item)| item.unique_id())
375            .map(|(_nth, item)| item.unique_id())
376            .collect::<Vec<_>>();
377
378        if !duplicates.is_empty() {
379            #[cfg(any(debug_assertions, test))]
380            panic!("duplicate unique ids in this timeline: {duplicates:?}\n{:?}", self.items);
381
382            #[cfg(not(any(debug_assertions, test)))]
383            tracing::error!(
384                ?duplicates,
385                items = ?self.items,
386                "duplicate unique ids in this timeline",
387            );
388        }
389    }
390
391    /// Whether the event should be added to the timeline as a new item.
392    fn should_add_event_item(
393        &self,
394        room_data_provider: &P,
395        settings: &TimelineSettings,
396        event: &AnySyncTimelineEvent,
397        thread_root: Option<&EventId>,
398        position: TimelineItemPosition,
399    ) -> bool {
400        let rules = room_data_provider.room_version_rules();
401
402        if !(settings.event_filter)(event, &rules) {
403            // The user filtered out the event.
404            return false;
405        }
406
407        match &self.focus {
408            TimelineFocusKind::PinnedEvents { .. } => {
409                // Only add pinned events for the pinned events timeline.
410                room_data_provider.is_pinned_event(event.event_id())
411            }
412
413            TimelineFocusKind::Event { hide_threaded_events, .. } => {
414                // If the timeline's filtering out in-thread events, don't add items for
415                // threaded events.
416                if thread_root.is_some() && *hide_threaded_events {
417                    return false;
418                }
419
420                // Retrieve the origin of the event.
421                let origin = match position {
422                    TimelineItemPosition::End { origin }
423                    | TimelineItemPosition::Start { origin }
424                    | TimelineItemPosition::At { origin, .. } => origin,
425
426                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
427                        .items
428                        .get(idx)
429                        .and_then(|item| item.as_event()?.as_remote())
430                        .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
431                };
432
433                match origin {
434                    // Never add any item to a focused timeline when the item comes from sync.
435                    RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => false,
436                    RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => true,
437                }
438            }
439
440            TimelineFocusKind::Live { hide_threaded_events } => {
441                // If the timeline's filtering out in-thread events, don't add items for
442                // threaded events.
443                thread_root.is_none() || !hide_threaded_events
444            }
445
446            TimelineFocusKind::Thread { root_event_id, .. } => {
447                // Add new items only for the thread root and the thread replies.
448                event.event_id() == root_event_id
449                    || thread_root.as_ref().is_some_and(|r| r == root_event_id)
450            }
451        }
452    }
453
454    /// After a deserialization error, adds a failed-to-parse item to the
455    /// timeline if configured to do so, or logs the error (and optionally
456    /// save metadata) if not.
457    async fn maybe_add_error_item(
458        &mut self,
459        position: TimelineItemPosition,
460        room_data_provider: &P,
461        raw: &Raw<AnySyncTimelineEvent>,
462        deserialization_error: serde_json::Error,
463        settings: &TimelineSettings,
464    ) -> Option<(
465        OwnedEventId,
466        OwnedUserId,
467        MilliSecondsSinceUnixEpoch,
468        Option<OwnedTransactionId>,
469        Option<TimelineAction>,
470        bool,
471    )> {
472        let state_key: Option<String> = raw.get_field("state_key").ok().flatten();
473
474        // A state event is an event that has a state key. Note that the two branches
475        // differ because the inferred return type for `get_field` is different
476        // in each case.
477        //
478        // If this was a state event but it didn't include a state_key, we'll assume it
479        // was a msg-like, because we can't do much more.
480        let event_type = if let Some(state_key) = state_key {
481            raw.get_field("type")
482                .ok()
483                .flatten()
484                .map(|event_type| FailedToParseEvent::State { event_type, state_key })
485        } else {
486            raw.get_field("type").ok().flatten().map(FailedToParseEvent::MsgLike)
487        };
488
489        let event_id: Option<OwnedEventId> = raw.get_field("event_id").ok().flatten();
490        let Some(event_id) = event_id else {
491            // If the event doesn't even have an event ID, we can't do anything with it.
492            warn!(
493                ?event_type,
494                "Failed to deserialize timeline event (with no ID): {deserialization_error}"
495            );
496            return None;
497        };
498
499        let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
500        let origin_server_ts: Option<MilliSecondsSinceUnixEpoch> =
501            raw.get_field("origin_server_ts").ok().flatten();
502
503        match (sender, origin_server_ts, event_type) {
504            (Some(sender), Some(origin_server_ts), Some(event_type))
505                if settings.add_failed_to_parse =>
506            {
507                // We have sufficient information to show an item in the timeline, and we've
508                // been requested to show it, let's do it.
509                #[derive(serde::Deserialize)]
510                struct Unsigned {
511                    transaction_id: Option<OwnedTransactionId>,
512                }
513
514                let transaction_id: Option<OwnedTransactionId> = raw
515                    .get_field::<Unsigned>("unsigned")
516                    .ok()
517                    .flatten()
518                    .and_then(|unsigned| unsigned.transaction_id);
519
520                // The event can be partially deserialized, and it is allowed to be added to
521                // the timeline.
522                Some((
523                    event_id,
524                    sender,
525                    origin_server_ts,
526                    transaction_id,
527                    Some(TimelineAction::failed_to_parse(event_type, deserialization_error)),
528                    true,
529                ))
530            }
531
532            (sender, origin_server_ts, event_type) => {
533                // We either lack information for rendering an item, or we've been requested not
534                // to show it. Save it into the metadata and return.
535                warn!(
536                    ?event_type,
537                    ?event_id,
538                    "Failed to deserialize timeline event: {deserialization_error}"
539                );
540
541                // Remember the event before returning prematurely.
542                // See [`ObservableItems::all_remote_events`].
543                self.add_or_update_remote_event(
544                    EventMeta::new(event_id, false),
545                    sender.as_deref(),
546                    origin_server_ts,
547                    position,
548                    room_data_provider,
549                    settings,
550                )
551                .await;
552                None
553            }
554        }
555    }
556
557    // Attempt to load a thread's latest reply as an embedded timeline item, either
558    // using the event cache or the storage.
559    async fn fetch_latest_thread_reply(
560        &mut self,
561        event_id: &EventId,
562        room_data_provider: &P,
563    ) -> Option<Box<EmbeddedEvent>> {
564        let event = RoomDataProvider::load_event(room_data_provider, event_id)
565            .await
566            .inspect_err(|err| {
567                warn!("Failed to load thread latest event: {err}");
568            })
569            .ok()?;
570
571        EmbeddedEvent::try_from_timeline_event(event, room_data_provider, &self.meta)
572            .await
573            .inspect_err(|err| {
574                warn!("Failed to extract thread latest event into a timeline item content: {err}");
575            })
576            .ok()
577            .flatten()
578            .map(Box::new)
579    }
580
581    /// Handle a remote event.
582    ///
583    /// Returns whether an item has been removed from the timeline.
584    pub(super) async fn handle_remote_event(
585        &mut self,
586        event: TimelineEvent,
587        position: TimelineItemPosition,
588        room_data_provider: &P,
589        settings: &TimelineSettings,
590        date_divider_adjuster: &mut DateDividerAdjuster,
591    ) -> RemovedItem {
592        let is_highlighted =
593            event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight));
594
595        let thread_summary = if let ThreadSummaryStatus::Some(summary) = event.thread_summary {
596            let latest_reply_item = if let Some(latest_reply) = summary.latest_reply {
597                self.fetch_latest_thread_reply(&latest_reply, room_data_provider).await
598            } else {
599                None
600            };
601            Some(ThreadSummary {
602                latest_event: TimelineDetails::from_initial_value(latest_reply_item),
603                num_replies: summary.num_replies,
604            })
605        } else {
606            None
607        };
608
609        let encryption_info = event.kind.encryption_info().cloned();
610
611        let bundled_edit_encryption_info = event.kind.unsigned_encryption_map().and_then(|map| {
612            map.get(&UnsignedEventLocation::RelationsReplace)?.encryption_info().cloned()
613        });
614
615        let (raw, utd_info) = match event.kind {
616            TimelineEventKind::UnableToDecrypt { utd_info, event } => (event, Some(utd_info)),
617            _ => (event.kind.into_raw(), None),
618        };
619
620        let (event_id, sender, timestamp, txn_id, timeline_action, should_add) = match raw
621            .deserialize()
622        {
623            // Classical path: the event is valid, can be deserialized, everything is alright.
624            Ok(event) => {
625                let (in_reply_to, thread_root) = self.meta.process_event_relations(
626                    &event,
627                    &raw,
628                    bundled_edit_encryption_info,
629                    &self.items,
630                    matches!(self.focus, TimelineFocusKind::Thread { .. }),
631                );
632
633                let should_add = self.should_add_event_item(
634                    room_data_provider,
635                    settings,
636                    &event,
637                    thread_root.as_deref(),
638                    position,
639                );
640
641                (
642                    event.event_id().to_owned(),
643                    event.sender().to_owned(),
644                    event.origin_server_ts(),
645                    event.transaction_id().map(ToOwned::to_owned),
646                    TimelineAction::from_event(
647                        event,
648                        &raw,
649                        room_data_provider,
650                        utd_info
651                            .map(|utd_info| (utd_info, self.meta.unable_to_decrypt_hook.as_ref())),
652                        in_reply_to,
653                        thread_root,
654                        thread_summary,
655                    )
656                    .await,
657                    should_add,
658                )
659            }
660
661            // The event seems invalid…
662            Err(e) => {
663                if let Some(tuple) =
664                    self.maybe_add_error_item(position, room_data_provider, &raw, e, settings).await
665                {
666                    tuple
667                } else {
668                    return false;
669                }
670            }
671        };
672
673        // Remember the event.
674        // See [`ObservableItems::all_remote_events`].
675        self.add_or_update_remote_event(
676            EventMeta::new(event_id.clone(), should_add),
677            Some(&sender),
678            Some(timestamp),
679            position,
680            room_data_provider,
681            settings,
682        )
683        .await;
684
685        // Handle the event to create or update a timeline item.
686        let item_added = if let Some(timeline_action) = timeline_action {
687            let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
688
689            let ctx = TimelineEventContext {
690                sender,
691                sender_profile,
692                timestamp,
693                read_receipts: if settings.track_read_receipts && should_add {
694                    self.meta.read_receipts.compute_event_receipts(
695                        &event_id,
696                        &mut self.items,
697                        matches!(position, TimelineItemPosition::End { .. }),
698                    )
699                } else {
700                    Default::default()
701                },
702                is_highlighted,
703                flow: Flow::Remote {
704                    event_id: event_id.clone(),
705                    raw_event: raw,
706                    encryption_info,
707                    txn_id,
708                    position,
709                },
710                should_add_new_items: should_add,
711            };
712
713            TimelineEventHandler::new(self, ctx)
714                .handle_event(date_divider_adjuster, timeline_action)
715                .await
716        } else {
717            // No item has been added to the timeline.
718            false
719        };
720
721        let mut item_removed = false;
722
723        if !item_added {
724            trace!("No new item added");
725
726            if let TimelineItemPosition::UpdateAt { timeline_item_index } = position {
727                // If add was not called, that means the UTD event is one that
728                // wouldn't normally be visible. Remove it.
729                trace!("Removing UTD that was successfully retried");
730                self.items.remove(timeline_item_index);
731                item_removed = true;
732            }
733        }
734
735        item_removed
736    }
737
738    /// Remove one timeline item by its `event_index`.
739    fn remove_timeline_item(
740        &mut self,
741        event_index: usize,
742        day_divider_adjuster: &mut DateDividerAdjuster,
743    ) {
744        day_divider_adjuster.mark_used();
745
746        // We need to be careful here.
747        //
748        // We must first remove the timeline item, which will update the mapping between
749        // remote events and timeline items. Removing the timeline item will “unlink”
750        // this mapping as the remote event will be updated to map to nothing. Only
751        // after that, we can remove the remote event. Doing this in the other order
752        // will update the mapping twice, and will result in a corrupted state.
753
754        // Remove the timeline item first.
755        if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
756            // Fetch the `timeline_item_index` associated to the remote event.
757            if let Some(timeline_item_index) = event_meta.timeline_item_index {
758                let _ = self.items.remove(timeline_item_index);
759            }
760
761            // Now we can remove the remote event.
762            self.items.remove_remote_event(event_index);
763        }
764    }
765
766    pub(super) fn clear(&mut self) {
767        // By first checking if there are any local echoes first, we do a bit
768        // more work in case some are found, but it should be worth it because
769        // there will often not be any, and only emitting a single
770        // `VectorDiff::Clear` should be much more efficient to process for
771        // subscribers.
772        if self.items.has_local() {
773            // Remove all remote events and virtual items that aren't date dividers.
774            self.items.for_each(|entry| {
775                if entry.is_remote_event()
776                    || entry.as_virtual().is_some_and(|vitem| match vitem {
777                        VirtualTimelineItem::DateDivider(_) => false,
778                        VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
779                            true
780                        }
781                    })
782                {
783                    ObservableItemsTransactionEntry::remove(entry);
784                }
785            });
786
787            // Remove stray date dividers
788            let mut idx = 0;
789            while idx < self.items.len() {
790                if self.items[idx].is_date_divider()
791                    && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
792                {
793                    self.items.remove(idx);
794                    // don't increment idx because all elements have shifted
795                } else {
796                    idx += 1;
797                }
798            }
799        } else {
800            self.items.clear();
801        }
802
803        self.meta.clear();
804
805        debug!(remaining_items = self.items.len(), "Timeline cleared");
806    }
807
808    #[instrument(skip_all)]
809    pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
810        // A similar event has been handled already. We can ignore it.
811        if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
812            return;
813        }
814
815        self.meta.fully_read_event = Some(fully_read_event_id);
816        self.meta.update_read_marker(&mut self.items);
817    }
818
819    pub(super) fn commit(self) {
820        // Update the `subscriber_skip_count` value.
821        let previous_number_of_items = self.number_of_items_when_transaction_started;
822        let next_number_of_items = self.items.len();
823
824        if previous_number_of_items != next_number_of_items {
825            let count = self
826                .meta
827                .subscriber_skip_count
828                .compute_next(previous_number_of_items, next_number_of_items);
829            self.meta
830                .subscriber_skip_count
831                .update(count, matches!(self.focus, TimelineFocusKind::Live { .. }));
832        }
833
834        // Replace the pointer to the previous meta with the new one.
835        *self.previous_meta = self.meta;
836
837        self.items.commit();
838    }
839
840    /// Add or update a remote event in the
841    /// [`ObservableItems::all_remote_events`] collection.
842    ///
843    /// This method also adjusts read receipt if needed.
844    async fn add_or_update_remote_event(
845        &mut self,
846        event_meta: EventMeta,
847        sender: Option<&UserId>,
848        timestamp: Option<MilliSecondsSinceUnixEpoch>,
849        position: TimelineItemPosition,
850        room_data_provider: &P,
851        settings: &TimelineSettings,
852    ) {
853        let event_id = event_meta.event_id.clone();
854
855        match position {
856            TimelineItemPosition::Start { .. } => self.items.push_front_remote_event(event_meta),
857
858            TimelineItemPosition::End { .. } => {
859                self.items.push_back_remote_event(event_meta);
860            }
861
862            TimelineItemPosition::At { event_index, .. } => {
863                self.items.insert_remote_event(event_index, event_meta);
864            }
865
866            TimelineItemPosition::UpdateAt { .. } => {
867                if let Some(event) =
868                    self.items.get_remote_event_by_event_id_mut(&event_meta.event_id)
869                    && event.visible != event_meta.visible
870                {
871                    event.visible = event_meta.visible;
872
873                    if settings.track_read_receipts {
874                        // Since the event's visibility changed, we need to update the read
875                        // receipts of the previous visible event.
876                        self.maybe_update_read_receipts_of_prev_event(&event_meta.event_id);
877                    }
878                }
879            }
880        }
881
882        if settings.track_read_receipts
883            && matches!(
884                position,
885                TimelineItemPosition::Start { .. }
886                    | TimelineItemPosition::End { .. }
887                    | TimelineItemPosition::At { .. }
888            )
889        {
890            self.load_read_receipts_for_event(&event_id, room_data_provider).await;
891
892            self.maybe_add_implicit_read_receipt(&event_id, sender, timestamp);
893        }
894    }
895
896    pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
897        adjuster.run(&mut self.items, &mut self.meta);
898    }
899
900    /// This method replaces the `is_room_encrypted` value for all timeline
901    /// items to its updated version and creates a `VectorDiff::Set` operation
902    /// for each item which will be added to this transaction.
903    pub(super) fn mark_all_events_as_encrypted(&mut self) {
904        for idx in 0..self.items.len() {
905            let item = &self.items[idx];
906
907            if let Some(event) = item.as_event() {
908                if event.is_room_encrypted {
909                    continue;
910                }
911
912                let mut cloned_event = event.clone();
913                cloned_event.is_room_encrypted = true;
914
915                // Replace the existing item with a new version with the right encryption flag
916                let item = item.with_kind(cloned_event);
917                self.items.replace(idx, item);
918            }
919        }
920    }
921}