1use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::{
20 ThreadSummary as SdkThreadSummary, ThreadSummaryStatus, TimelineEvent, TimelineEventKind,
21 UnsignedEventLocation,
22};
23use ruma::{
24 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
25 events::{
26 AnySyncTimelineEvent,
27 receipt::{ReceiptThread, ReceiptType},
28 },
29 push::Action,
30 serde::Raw,
31};
32use tracing::{debug, instrument, trace, warn};
33
34use super::{
35 super::{
36 controller::ObservableItemsTransactionEntry,
37 date_dividers::DateDividerAdjuster,
38 event_handler::{Flow, TimelineEventContext, TimelineEventHandler, TimelineItemPosition},
39 event_item::RemoteEventOrigin,
40 traits::RoomDataProvider,
41 },
42 ObservableItems, ObservableItemsTransaction, TimelineMetadata, TimelineReadReceiptTracking,
43 TimelineSettings,
44 metadata::EventMeta,
45};
46use crate::timeline::{
47 EmbeddedEvent, Profile, ThreadSummary, TimelineDetails, TimelineUniqueId, VirtualTimelineItem,
48 controller::TimelineFocusKind,
49 event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
50};
51
52pub(in crate::timeline) struct TimelineStateTransaction<'a, P: RoomDataProvider> {
53 pub items: ObservableItemsTransaction<'a>,
56
57 number_of_items_when_transaction_started: usize,
59
60 pub meta: TimelineMetadata,
64
65 previous_meta: &'a mut TimelineMetadata,
67
68 pub focus: &'a TimelineFocusKind,
70
71 _phantom: std::marker::PhantomData<P>,
73}
74
75impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
76 pub(super) fn new(
78 items: &'a mut ObservableItems,
79 meta: &'a mut TimelineMetadata,
80 focus: &'a TimelineFocusKind,
81 ) -> Self {
82 let previous_meta = meta;
83 let meta = previous_meta.clone();
84 let items = items.transaction();
85
86 Self {
87 number_of_items_when_transaction_started: items.len(),
88 items,
89 previous_meta,
90 meta,
91 focus,
92 _phantom: std::marker::PhantomData,
93 }
94 }
95
96 pub(super) async fn handle_remote_events_with_diffs(
98 &mut self,
99 diffs: Vec<VectorDiff<TimelineEvent>>,
100 origin: RemoteEventOrigin,
101 room_data_provider: &P,
102 settings: &TimelineSettings,
103 ) {
104 let mut date_divider_adjuster =
105 DateDividerAdjuster::new(settings.date_divider_mode.clone());
106
107 let mut cached_profiles: HashMap<OwnedUserId, Option<Profile>> = HashMap::new();
108
109 let mut recycled_timeline_ids = HashMap::new();
110
111 for diff in diffs {
112 match diff {
113 VectorDiff::Append { values: events } => {
114 for event in events {
115 let recycled_timeline_id = event
116 .event_id()
117 .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
118 self.handle_remote_event(
119 event,
120 TimelineItemPosition::End { origin },
121 room_data_provider,
122 settings,
123 &mut date_divider_adjuster,
124 &mut cached_profiles,
125 recycled_timeline_id,
126 )
127 .await;
128 }
129 }
130
131 VectorDiff::PushFront { value: event } => {
132 let recycled_timeline_id = event
133 .event_id()
134 .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
135 self.handle_remote_event(
136 event,
137 TimelineItemPosition::Start { origin },
138 room_data_provider,
139 settings,
140 &mut date_divider_adjuster,
141 &mut cached_profiles,
142 recycled_timeline_id,
143 )
144 .await;
145 }
146
147 VectorDiff::PushBack { value: event } => {
148 let recycled_timeline_id = event
149 .event_id()
150 .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
151 self.handle_remote_event(
152 event,
153 TimelineItemPosition::End { origin },
154 room_data_provider,
155 settings,
156 &mut date_divider_adjuster,
157 &mut cached_profiles,
158 recycled_timeline_id,
159 )
160 .await;
161 }
162
163 VectorDiff::Insert { index: event_index, value: event } => {
164 let recycled_timeline_id = event
165 .event_id()
166 .and_then(|event_id| recycled_timeline_ids.remove(&event_id));
167 self.handle_remote_event(
168 event,
169 TimelineItemPosition::At { event_index, origin },
170 room_data_provider,
171 settings,
172 &mut date_divider_adjuster,
173 &mut cached_profiles,
174 recycled_timeline_id,
175 )
176 .await;
177 }
178
179 VectorDiff::Set { index: event_index, value: event } => {
180 if let Some(timeline_item_index) = self
181 .items
182 .all_remote_events()
183 .get(event_index)
184 .and_then(|meta| meta.timeline_item_index)
185 {
186 self.handle_remote_event(
187 event,
188 TimelineItemPosition::UpdateAt { timeline_item_index },
189 room_data_provider,
190 settings,
191 &mut date_divider_adjuster,
192 &mut cached_profiles,
193 None,
194 )
195 .await;
196 } else {
197 warn!(
198 event_index,
199 "Set update dropped because there wasn't any attached timeline item index."
200 );
201 }
202 }
203
204 VectorDiff::Remove { index: event_index } => {
205 if let Some((timeline_id, event_id)) =
206 self.remove_timeline_item(event_index, &mut date_divider_adjuster)
207 {
208 recycled_timeline_ids.insert(event_id, timeline_id);
209 }
210 }
211
212 VectorDiff::Clear => {
213 self.clear();
214 }
215
216 v => unimplemented!("{v:?}"),
217 }
218 }
219
220 self.adjust_date_dividers(date_divider_adjuster);
221 self.check_invariants();
222 }
223
224 async fn handle_remote_aggregation(
225 &mut self,
226 event: TimelineEvent,
227 position: TimelineItemPosition,
228 room_data_provider: &P,
229 date_divider_adjuster: &mut DateDividerAdjuster,
230 ) {
231 let raw_event = event.raw();
232
233 let deserialized = match raw_event.deserialize() {
234 Ok(deserialized) => deserialized,
235 Err(err) => {
236 warn!("Failed to deserialize timeline event: {err}");
237 return;
238 }
239 };
240
241 let sender = deserialized.sender().to_owned();
242 let timestamp = deserialized.origin_server_ts();
243 let event_id = deserialized.event_id().to_owned();
244 let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
245
246 let timeline_action = TimelineAction::from_event(
247 deserialized,
248 raw_event,
249 room_data_provider,
250 None,
251 None,
252 None,
253 None,
254 )
255 .await;
256
257 match timeline_action {
258 Some(action @ TimelineAction::AddItem { .. })
259 | Some(action @ TimelineAction::HandleAggregation { .. }) => {
260 let encryption_info = event.kind.encryption_info().cloned();
261 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
262
263 let (forwarder, forwarder_profile) =
264 get_forwarder_info(&event, room_data_provider).await;
265
266 let mut ctx = TimelineEventContext {
267 sender,
268 sender_profile,
269 forwarder,
270 forwarder_profile,
271 timestamp,
272 read_receipts: Default::default(),
274 is_highlighted: false,
275 flow: Flow::Remote {
276 event_id: event_id.clone(),
277 raw_event: event.raw().clone(),
278 encryption_info,
279 txn_id,
280 position,
281 },
282 should_add_new_items: false,
284 };
285
286 if let TimelineAction::AddItem { .. } = action
289 && let TimelineItemPosition::UpdateAt { timeline_item_index } = position
290 && let Some(event) = self.items.get(timeline_item_index)
291 && event
292 .as_event()
293 .map(|e| {
294 e.content().is_unable_to_decrypt() && e.event_id() == Some(&event_id)
295 })
296 .unwrap_or_default()
297 {
298 ctx.should_add_new_items = true;
300 }
301
302 TimelineEventHandler::new(self, ctx)
303 .handle_event(date_divider_adjuster, action, None)
304 .await;
305 }
306 None => {}
307 }
308 }
309
310 pub(super) async fn handle_remote_aggregations(
317 &mut self,
318 diffs: Vec<VectorDiff<TimelineEvent>>,
319 origin: RemoteEventOrigin,
320 room_data_provider: &P,
321 settings: &TimelineSettings,
322 ) {
323 let mut date_divider_adjuster =
324 DateDividerAdjuster::new(settings.date_divider_mode.clone());
325
326 for diff in diffs {
327 match diff {
328 VectorDiff::Append { values: events } => {
329 for event in events {
330 self.handle_remote_aggregation(
331 event,
332 TimelineItemPosition::End { origin },
333 room_data_provider,
334 &mut date_divider_adjuster,
335 )
336 .await;
337 }
338 }
339
340 VectorDiff::PushFront { value: event } => {
341 self.handle_remote_aggregation(
342 event,
343 TimelineItemPosition::Start { origin },
344 room_data_provider,
345 &mut date_divider_adjuster,
346 )
347 .await;
348 }
349
350 VectorDiff::PushBack { value: event } => {
351 self.handle_remote_aggregation(
352 event,
353 TimelineItemPosition::End { origin },
354 room_data_provider,
355 &mut date_divider_adjuster,
356 )
357 .await;
358 }
359
360 VectorDiff::Insert { index: event_index, value: event } => {
361 self.handle_remote_aggregation(
362 event,
363 TimelineItemPosition::At { event_index, origin },
364 room_data_provider,
365 &mut date_divider_adjuster,
366 )
367 .await;
368 }
369
370 VectorDiff::Set { index: event_index, value: event } => {
371 if let Some(timeline_item_index) = self
372 .items
373 .all_remote_events()
374 .get(event_index)
375 .and_then(|meta| meta.timeline_item_index)
376 {
377 self.handle_remote_aggregation(
378 event,
379 TimelineItemPosition::UpdateAt { timeline_item_index },
380 room_data_provider,
381 &mut date_divider_adjuster,
382 )
383 .await;
384 } else if let Some(event_id) = event.event_id()
385 && let Some(meta) =
386 self.items.all_remote_events().get_by_event_id(&event_id)
387 && let Some(timeline_item_index) = meta.timeline_item_index
388 {
389 self.handle_remote_aggregation(
412 event,
413 TimelineItemPosition::UpdateAt { timeline_item_index },
414 room_data_provider,
415 &mut date_divider_adjuster,
416 )
417 .await;
418 } else {
419 warn!(
420 event_index,
421 "Set update dropped because there wasn't any attached timeline item index."
422 );
423 }
424 }
425
426 VectorDiff::Remove { .. } | VectorDiff::Clear => {
427 }
431
432 v => unimplemented!("{v:?}"),
433 }
434 }
435
436 self.adjust_date_dividers(date_divider_adjuster);
437 self.check_invariants();
438 }
439
440 fn check_invariants(&self) {
441 self.check_no_duplicate_read_receipts();
442 self.check_no_unused_unique_ids();
443 }
444
445 fn check_no_duplicate_read_receipts(&self) {
446 let mut by_user_id = HashMap::new();
447 let mut duplicates = HashSet::new();
448
449 for item in self.items.iter_remotes_region().filter_map(|(_, item)| item.as_event()) {
450 if let Some(event_id) = item.event_id() {
451 for (user_id, _read_receipt) in item.read_receipts() {
452 if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
453 duplicates.insert((user_id.clone(), prev_event_id, event_id));
454 }
455 }
456 }
457 }
458
459 if !duplicates.is_empty() {
460 #[cfg(any(debug_assertions, test))]
461 panic!("duplicate read receipts in this timeline: {duplicates:?}\n{:?}", self.items);
462
463 #[cfg(not(any(debug_assertions, test)))]
464 tracing::error!(
465 ?duplicates,
466 items = ?self.items,
467 "duplicate read receipts in this timeline",
468 );
469 }
470 }
471
472 fn check_no_unused_unique_ids(&self) {
473 let duplicates = self
474 .items
475 .iter_all_regions()
476 .duplicates_by(|(_nth, item)| item.unique_id())
477 .map(|(_nth, item)| item.unique_id())
478 .collect::<Vec<_>>();
479
480 if !duplicates.is_empty() {
481 #[cfg(any(debug_assertions, test))]
482 panic!("duplicate unique ids in this timeline: {duplicates:?}\n{:?}", self.items);
483
484 #[cfg(not(any(debug_assertions, test)))]
485 tracing::error!(
486 ?duplicates,
487 items = ?self.items,
488 "duplicate unique ids in this timeline",
489 );
490 }
491 }
492
493 fn should_add_event_item(
495 &self,
496 room_data_provider: &P,
497 settings: &TimelineSettings,
498 event: &AnySyncTimelineEvent,
499 thread_root: Option<&EventId>,
500 position: TimelineItemPosition,
501 ) -> bool {
502 let rules = room_data_provider.room_version_rules();
503
504 if !(settings.event_filter)(event, &rules) {
505 return false;
507 }
508
509 match &self.focus {
510 TimelineFocusKind::PinnedEvents => {
511 true
513 }
514
515 TimelineFocusKind::Event { .. } => {
516 let origin = match position {
521 TimelineItemPosition::End { origin }
522 | TimelineItemPosition::Start { origin }
523 | TimelineItemPosition::At { origin, .. } => origin,
524
525 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
526 .items
527 .get(idx)
528 .and_then(|item| item.as_event()?.as_remote())
529 .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
530 };
531
532 match origin {
533 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => false,
535 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => true,
536 }
537 }
538
539 TimelineFocusKind::Live { hide_threaded_events } => {
540 thread_root.is_none() || !hide_threaded_events
543 }
544
545 TimelineFocusKind::Thread { root_event_id, .. } => {
546 event.event_id() == root_event_id
548 || thread_root.as_ref().is_some_and(|r| r == root_event_id)
549 }
550 }
551 }
552
553 fn can_show_read_receipts(
556 &self,
557 settings: &TimelineSettings,
558 event: &AnySyncTimelineEvent,
559 ) -> bool {
560 match event {
561 AnySyncTimelineEvent::State(_) => {
562 matches!(settings.track_read_receipts, TimelineReadReceiptTracking::AllEvents)
563 }
564 AnySyncTimelineEvent::MessageLike(_) => {
565 !matches!(settings.track_read_receipts, TimelineReadReceiptTracking::Disabled)
566 }
567 }
568 }
569
570 async fn maybe_add_error_item(
574 &mut self,
575 position: TimelineItemPosition,
576 room_data_provider: &P,
577 raw: &Raw<AnySyncTimelineEvent>,
578 deserialization_error: serde_json::Error,
579 settings: &TimelineSettings,
580 ) -> Option<(
581 OwnedEventId,
582 OwnedUserId,
583 MilliSecondsSinceUnixEpoch,
584 Option<OwnedTransactionId>,
585 Option<TimelineAction>,
586 Option<OwnedEventId>,
587 bool,
588 bool,
589 )> {
590 let state_key: Option<String> = raw.get_field("state_key").ok().flatten();
591
592 let event_type = if let Some(state_key) = state_key {
599 raw.get_field("type")
600 .ok()
601 .flatten()
602 .map(|event_type| FailedToParseEvent::State { event_type, state_key })
603 } else {
604 raw.get_field("type").ok().flatten().map(FailedToParseEvent::MsgLike)
605 };
606
607 let event_id: Option<OwnedEventId> = raw.get_field("event_id").ok().flatten();
608 let Some(event_id) = event_id else {
609 warn!(
611 ?event_type,
612 "Failed to deserialize timeline event (with no ID): {deserialization_error}"
613 );
614 return None;
615 };
616
617 let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
618 let origin_server_ts: Option<MilliSecondsSinceUnixEpoch> =
619 raw.get_field("origin_server_ts").ok().flatten();
620
621 match (sender, origin_server_ts, event_type) {
622 (Some(sender), Some(origin_server_ts), Some(event_type))
623 if settings.add_failed_to_parse =>
624 {
625 #[derive(serde::Deserialize)]
628 struct Unsigned {
629 transaction_id: Option<OwnedTransactionId>,
630 }
631
632 let transaction_id: Option<OwnedTransactionId> = raw
633 .get_field::<Unsigned>("unsigned")
634 .ok()
635 .flatten()
636 .and_then(|unsigned| unsigned.transaction_id);
637
638 Some((
641 event_id,
642 sender,
643 origin_server_ts,
644 transaction_id,
645 Some(TimelineAction::failed_to_parse(event_type, deserialization_error)),
646 None,
647 true,
648 true,
649 ))
650 }
651
652 (sender, origin_server_ts, event_type) => {
653 warn!(
656 ?event_type,
657 ?event_id,
658 "Failed to deserialize timeline event: {deserialization_error}"
659 );
660
661 self.add_or_update_remote_event(
664 EventMeta::new(event_id, false, false, None),
665 sender.as_deref(),
666 origin_server_ts,
667 position,
668 room_data_provider,
669 settings,
670 )
671 .await;
672 None
673 }
674 }
675 }
676
677 #[instrument(skip(self, room_data_provider))]
680 async fn fetch_latest_thread_reply(
681 &mut self,
682 event_id: &EventId,
683 room_data_provider: &P,
684 ) -> Option<Box<EmbeddedEvent>> {
685 let event = RoomDataProvider::load_event(room_data_provider, event_id)
686 .await
687 .inspect_err(|err| {
688 warn!("Failed to load thread latest event: {err}");
689 })
690 .ok()?;
691
692 EmbeddedEvent::try_from_timeline_event(event, room_data_provider, &self.meta)
693 .await
694 .inspect_err(|err| {
695 warn!("Failed to extract thread latest event into a timeline item content: {err}");
696 })
697 .ok()
698 .flatten()
699 .map(Box::new)
700 }
701
702 async fn compute_summary_thread_receipts(
705 &self,
706 event: &TimelineEvent,
707 summary: &SdkThreadSummary,
708 room_data_provider: &P,
709 settings: &TimelineSettings,
710 ) -> (Option<OwnedEventId>, Option<OwnedEventId>) {
711 if !settings.track_read_receipts.is_enabled() {
712 return (None, None);
713 }
714
715 #[allow(clippy::collapsible_if)] if let Some(ref latest_reply) = summary.latest_reply {
724 if let Ok(event) = RoomDataProvider::load_event(room_data_provider, latest_reply)
725 .await
726 .inspect_err(|err| {
727 warn!("Failed to load thread latest event: {err}");
728 })
729 {
730 if let Some(sender) = event.sender()
732 && sender == self.meta.own_user_id
733 {
734 let latest = Some(latest_reply.clone());
735 return (latest.clone(), latest);
736 }
737 }
738 }
739
740 let own_thread_public_receipt = if let Some(event_id) = event.event_id() {
742 room_data_provider
743 .load_user_receipt(
744 ReceiptType::Read,
745 ReceiptThread::Thread(event_id),
746 &self.meta.own_user_id,
747 )
748 .await
749 .map(|(event_id, _receipt)| event_id)
750 } else {
751 None
752 };
753
754 let own_thread_private_receipt = if let Some(event_id) = event.event_id() {
755 room_data_provider
756 .load_user_receipt(
757 ReceiptType::ReadPrivate,
758 ReceiptThread::Thread(event_id),
759 &self.meta.own_user_id,
760 )
761 .await
762 .map(|(event_id, _receipt)| event_id)
763 } else {
764 None
765 };
766
767 (own_thread_public_receipt, own_thread_private_receipt)
768 }
769
770 #[allow(clippy::too_many_arguments)]
774 pub(super) async fn handle_remote_event(
775 &mut self,
776 event: TimelineEvent,
777 position: TimelineItemPosition,
778 room_data_provider: &P,
779 settings: &TimelineSettings,
780 date_divider_adjuster: &mut DateDividerAdjuster,
781 profiles: &mut HashMap<OwnedUserId, Option<Profile>>,
782 recycled_timeline_id: Option<TimelineUniqueId>,
783 ) -> RemovedItem {
784 let is_highlighted =
785 event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight));
786
787 let thread_summary = if let ThreadSummaryStatus::Some(ref summary) = event.thread_summary {
788 let latest_reply_item = if let Some(ref latest_reply) = summary.latest_reply {
789 self.fetch_latest_thread_reply(latest_reply, room_data_provider).await
790 } else {
791 None
792 };
793
794 let (own_thread_public_receipt, own_thread_private_receipt) = self
795 .compute_summary_thread_receipts(&event, summary, room_data_provider, settings)
796 .await;
797
798 Some(ThreadSummary {
799 latest_event: TimelineDetails::from_initial_value(latest_reply_item),
800 num_replies: summary.num_replies,
801 public_read_receipt_event_id: own_thread_public_receipt,
802 private_read_receipt_event_id: own_thread_private_receipt,
803 })
804 } else {
805 None
806 };
807
808 let encryption_info = event.kind.encryption_info().cloned();
809
810 let bundled_edit_encryption_info = event.kind.unsigned_encryption_map().and_then(|map| {
811 map.get(&UnsignedEventLocation::RelationsReplace)?.encryption_info().cloned()
812 });
813
814 let (forwarder, forwarder_profile) = get_forwarder_info(&event, room_data_provider).await;
815
816 let (raw, utd_info) = match event.kind {
817 TimelineEventKind::UnableToDecrypt { utd_info, event } => (event, Some(utd_info)),
818 _ => (event.kind.into_raw(), None),
819 };
820
821 let (
822 event_id,
823 sender,
824 timestamp,
825 txn_id,
826 timeline_action,
827 thread_root,
828 should_add,
829 can_show_read_receipts,
830 ) = match raw.deserialize() {
831 Ok(event) => {
833 let (in_reply_to, thread_root) = self.meta.process_event_relations(
834 &event,
835 &raw,
836 bundled_edit_encryption_info,
837 &self.items,
838 self.focus.is_thread(),
839 );
840
841 let should_add = self.should_add_event_item(
842 room_data_provider,
843 settings,
844 &event,
845 thread_root.as_deref(),
846 position,
847 );
848
849 let can_show_read_receipts = self.can_show_read_receipts(settings, &event);
850
851 (
852 event.event_id().to_owned(),
853 event.sender().to_owned(),
854 event.origin_server_ts(),
855 event.transaction_id().map(ToOwned::to_owned),
856 TimelineAction::from_event(
857 event,
858 &raw,
859 room_data_provider,
860 utd_info
861 .map(|utd_info| (utd_info, self.meta.unable_to_decrypt_hook.as_ref())),
862 in_reply_to,
863 thread_root.clone(),
864 thread_summary,
865 )
866 .await,
867 thread_root,
868 should_add,
869 can_show_read_receipts,
870 )
871 }
872
873 Err(e) => {
875 if let Some(tuple) =
876 self.maybe_add_error_item(position, room_data_provider, &raw, e, settings).await
877 {
878 tuple
879 } else {
880 return false;
881 }
882 }
883 };
884
885 self.add_or_update_remote_event(
888 EventMeta::new(event_id.clone(), should_add, can_show_read_receipts, thread_root),
889 Some(&sender),
890 Some(timestamp),
891 position,
892 room_data_provider,
893 settings,
894 )
895 .await;
896
897 let item_added = if let Some(timeline_action) = timeline_action {
899 let sender_profile = if let Some(profile) = profiles.get(&sender) {
900 profile.clone()
901 } else {
902 let profile = room_data_provider.profile_from_user_id(&sender).await;
903 profiles.insert(sender.clone(), profile.clone());
904 profile
905 };
906
907 let ctx = TimelineEventContext {
908 sender,
909 sender_profile,
910 forwarder,
911 forwarder_profile,
912 timestamp,
913 read_receipts: if settings.track_read_receipts.is_enabled()
914 && should_add
915 && can_show_read_receipts
916 {
917 self.meta.read_receipts.compute_event_receipts(
918 &event_id,
919 &mut self.items,
920 matches!(position, TimelineItemPosition::End { .. }),
921 )
922 } else {
923 Default::default()
924 },
925 is_highlighted,
926 flow: Flow::Remote {
927 event_id: event_id.clone(),
928 raw_event: raw,
929 encryption_info,
930 txn_id,
931 position,
932 },
933 should_add_new_items: should_add,
934 };
935
936 TimelineEventHandler::new(self, ctx)
937 .handle_event(date_divider_adjuster, timeline_action, recycled_timeline_id)
938 .await
939 } else {
940 false
942 };
943
944 let mut item_removed = false;
945
946 if !item_added {
947 trace!("No new item added");
948
949 if let TimelineItemPosition::UpdateAt { timeline_item_index } = position {
950 trace!("Removing UTD that was successfully retried");
953 self.items.remove(timeline_item_index);
954 item_removed = true;
955 }
956 }
957
958 item_removed
959 }
960
961 fn remove_timeline_item(
963 &mut self,
964 event_index: usize,
965 day_divider_adjuster: &mut DateDividerAdjuster,
966 ) -> Option<(TimelineUniqueId, OwnedEventId)> {
967 day_divider_adjuster.mark_used();
968
969 let mut recycled_timeline_id = None;
978
979 if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
981 if let Some(timeline_item_index) = event_meta.timeline_item_index {
983 let event_id = event_meta.event_id.clone();
984 let timeline_item = self.items.remove(timeline_item_index);
985 recycled_timeline_id = Some((timeline_item.unique_id().clone(), event_id));
986 }
987
988 self.items.remove_remote_event(event_index);
990 }
991
992 recycled_timeline_id
993 }
994
995 pub(super) fn clear(&mut self) {
996 if self.items.has_local() {
1002 self.items.for_each(|entry| {
1004 if entry.is_remote_event()
1005 || entry.as_virtual().is_some_and(|vitem| match vitem {
1006 VirtualTimelineItem::DateDivider(_) => false,
1007 VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
1008 true
1009 }
1010 })
1011 {
1012 ObservableItemsTransactionEntry::remove(entry);
1013 }
1014 });
1015
1016 let mut idx = 0;
1018 while idx < self.items.len() {
1019 if self.items[idx].is_date_divider()
1020 && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
1021 {
1022 self.items.remove(idx);
1023 } else {
1025 idx += 1;
1026 }
1027 }
1028 } else {
1029 self.items.clear();
1030 }
1031
1032 self.meta.clear();
1033
1034 debug!(remaining_items = self.items.len(), "Timeline cleared");
1035 }
1036
1037 #[instrument(skip_all)]
1038 pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
1039 if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
1041 return;
1042 }
1043
1044 self.meta.fully_read_event = Some(fully_read_event_id);
1045 self.meta.update_read_marker(&mut self.items);
1046 }
1047
1048 pub(super) fn commit(self) {
1049 let previous_number_of_items = self.number_of_items_when_transaction_started;
1051 let next_number_of_items = self.items.len();
1052
1053 if previous_number_of_items != next_number_of_items {
1054 let count = self
1055 .meta
1056 .subscriber_skip_count
1057 .compute_next(previous_number_of_items, next_number_of_items);
1058 self.meta
1059 .subscriber_skip_count
1060 .update(count, matches!(self.focus, TimelineFocusKind::Live { .. }));
1061 }
1062
1063 *self.previous_meta = self.meta;
1065
1066 self.items.commit();
1067 }
1068
1069 async fn add_or_update_remote_event(
1074 &mut self,
1075 event_meta: EventMeta,
1076 sender: Option<&UserId>,
1077 timestamp: Option<MilliSecondsSinceUnixEpoch>,
1078 position: TimelineItemPosition,
1079 room_data_provider: &P,
1080 settings: &TimelineSettings,
1081 ) {
1082 let event_id = event_meta.event_id.clone();
1083
1084 match position {
1085 TimelineItemPosition::Start { .. } => self.items.push_front_remote_event(event_meta),
1086
1087 TimelineItemPosition::End { .. } => {
1088 self.items.push_back_remote_event(event_meta);
1089 }
1090
1091 TimelineItemPosition::At { event_index, .. } => {
1092 self.items.insert_remote_event(event_index, event_meta);
1093 }
1094
1095 TimelineItemPosition::UpdateAt { .. } => {
1096 if let Some(event) =
1097 self.items.get_remote_event_by_event_id_mut(&event_meta.event_id)
1098 && (event.visible != event_meta.visible
1099 || event.can_show_read_receipts != event_meta.can_show_read_receipts)
1100 {
1101 event.visible = event_meta.visible;
1102 event.can_show_read_receipts = event_meta.can_show_read_receipts;
1103
1104 if settings.track_read_receipts.is_enabled() {
1105 self.maybe_update_read_receipts_of_prev_event(&event_meta.event_id);
1108 }
1109 }
1110 }
1111 }
1112
1113 if settings.track_read_receipts.is_enabled()
1114 && matches!(
1115 position,
1116 TimelineItemPosition::Start { .. }
1117 | TimelineItemPosition::End { .. }
1118 | TimelineItemPosition::At { .. }
1119 )
1120 {
1121 self.load_read_receipts_for_event(&event_id, room_data_provider).await;
1122
1123 self.maybe_add_implicit_read_receipt(&event_id, sender, timestamp);
1124 }
1125 }
1126
1127 pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
1128 adjuster.run(&mut self.items, &mut self.meta);
1129 }
1130
1131 pub(super) fn mark_all_events_as_encrypted(&mut self) {
1135 for idx in 0..self.items.len() {
1136 let item = &self.items[idx];
1137
1138 if let Some(event) = item.as_event() {
1139 if event.is_room_encrypted {
1140 continue;
1141 }
1142
1143 let mut cloned_event = event.clone();
1144 cloned_event.is_room_encrypted = true;
1145
1146 let item = item.with_kind(cloned_event);
1148 self.items.replace(idx, item);
1149 }
1150 }
1151 }
1152}
1153
1154async fn get_forwarder_info<P: RoomDataProvider>(
1167 event: &TimelineEvent,
1168 room_data_provider: &P,
1169) -> (Option<OwnedUserId>, Option<Profile>) {
1170 let forwarder = event
1171 .kind
1172 .encryption_info()
1173 .and_then(|info| info.forwarder.as_ref())
1174 .map(|info| info.user_id.clone());
1175
1176 let forwarder_profile = if let Some(ref forwarder_id) = forwarder {
1177 Some(room_data_provider.profile_from_user_id(forwarder_id).await)
1178 } else {
1179 None
1180 };
1181
1182 (forwarder, forwarder_profile.flatten())
1183}