1use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::{
20 ThreadSummaryStatus, TimelineEvent, TimelineEventKind, UnsignedEventLocation,
21};
22use ruma::{
23 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
24 events::AnySyncTimelineEvent, push::Action, serde::Raw,
25};
26use tracing::{debug, instrument, trace, warn};
27
28use super::{
29 super::{
30 controller::ObservableItemsTransactionEntry,
31 date_dividers::DateDividerAdjuster,
32 event_handler::{Flow, TimelineEventContext, TimelineEventHandler, TimelineItemPosition},
33 event_item::RemoteEventOrigin,
34 traits::RoomDataProvider,
35 },
36 ObservableItems, ObservableItemsTransaction, TimelineMetadata, TimelineSettings,
37 metadata::EventMeta,
38};
39use crate::timeline::{
40 EmbeddedEvent, ThreadSummary, TimelineDetails, VirtualTimelineItem,
41 controller::TimelineFocusKind,
42 event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
43};
44
45pub(in crate::timeline) struct TimelineStateTransaction<'a, P: RoomDataProvider> {
46 pub items: ObservableItemsTransaction<'a>,
49
50 number_of_items_when_transaction_started: usize,
52
53 pub meta: TimelineMetadata,
57
58 previous_meta: &'a mut TimelineMetadata,
60
61 pub focus: &'a TimelineFocusKind<P>,
63}
64
65impl<'a, P: RoomDataProvider> TimelineStateTransaction<'a, P> {
66 pub(super) fn new(
68 items: &'a mut ObservableItems,
69 meta: &'a mut TimelineMetadata,
70 focus: &'a TimelineFocusKind<P>,
71 ) -> Self {
72 let previous_meta = meta;
73 let meta = previous_meta.clone();
74 let items = items.transaction();
75
76 Self {
77 number_of_items_when_transaction_started: items.len(),
78 items,
79 previous_meta,
80 meta,
81 focus,
82 }
83 }
84
85 pub(super) async fn handle_remote_events_with_diffs(
87 &mut self,
88 diffs: Vec<VectorDiff<TimelineEvent>>,
89 origin: RemoteEventOrigin,
90 room_data_provider: &P,
91 settings: &TimelineSettings,
92 ) {
93 let mut date_divider_adjuster =
94 DateDividerAdjuster::new(settings.date_divider_mode.clone());
95
96 for diff in diffs {
97 match diff {
98 VectorDiff::Append { values: events } => {
99 for event in events {
100 self.handle_remote_event(
101 event,
102 TimelineItemPosition::End { origin },
103 room_data_provider,
104 settings,
105 &mut date_divider_adjuster,
106 )
107 .await;
108 }
109 }
110
111 VectorDiff::PushFront { value: event } => {
112 self.handle_remote_event(
113 event,
114 TimelineItemPosition::Start { origin },
115 room_data_provider,
116 settings,
117 &mut date_divider_adjuster,
118 )
119 .await;
120 }
121
122 VectorDiff::PushBack { value: event } => {
123 self.handle_remote_event(
124 event,
125 TimelineItemPosition::End { origin },
126 room_data_provider,
127 settings,
128 &mut date_divider_adjuster,
129 )
130 .await;
131 }
132
133 VectorDiff::Insert { index: event_index, value: event } => {
134 self.handle_remote_event(
135 event,
136 TimelineItemPosition::At { event_index, origin },
137 room_data_provider,
138 settings,
139 &mut date_divider_adjuster,
140 )
141 .await;
142 }
143
144 VectorDiff::Set { index: event_index, value: event } => {
145 if let Some(timeline_item_index) = self
146 .items
147 .all_remote_events()
148 .get(event_index)
149 .and_then(|meta| meta.timeline_item_index)
150 {
151 self.handle_remote_event(
152 event,
153 TimelineItemPosition::UpdateAt { timeline_item_index },
154 room_data_provider,
155 settings,
156 &mut date_divider_adjuster,
157 )
158 .await;
159 } else {
160 warn!(
161 event_index,
162 "Set update dropped because there wasn't any attached timeline item index."
163 );
164 }
165 }
166
167 VectorDiff::Remove { index: event_index } => {
168 self.remove_timeline_item(event_index, &mut date_divider_adjuster);
169 }
170
171 VectorDiff::Clear => {
172 self.clear();
173 }
174
175 v => unimplemented!("{v:?}"),
176 }
177 }
178
179 self.adjust_date_dividers(date_divider_adjuster);
180 self.check_invariants();
181 }
182
183 async fn handle_remote_aggregation(
184 &mut self,
185 event: TimelineEvent,
186 position: TimelineItemPosition,
187 room_data_provider: &P,
188 date_divider_adjuster: &mut DateDividerAdjuster,
189 ) {
190 let raw_event = event.raw();
191
192 let deserialized = match raw_event.deserialize() {
193 Ok(deserialized) => deserialized,
194 Err(err) => {
195 warn!("Failed to deserialize timeline event: {err}");
196 return;
197 }
198 };
199
200 let sender = deserialized.sender().to_owned();
201 let timestamp = deserialized.origin_server_ts();
202 let event_id = deserialized.event_id().to_owned();
203 let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
204
205 if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
206 deserialized,
207 raw_event,
208 room_data_provider,
209 None,
210 None,
211 None,
212 None,
213 )
214 .await
215 {
216 let encryption_info = event.kind.encryption_info().cloned();
217
218 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
219
220 let ctx = TimelineEventContext {
221 sender,
222 sender_profile,
223 timestamp,
224 read_receipts: Default::default(),
226 is_highlighted: false,
227 flow: Flow::Remote {
228 event_id: event_id.clone(),
229 raw_event: event.raw().clone(),
230 encryption_info,
231 txn_id,
232 position,
233 },
234 should_add_new_items: false,
236 };
237
238 TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
239 }
240 }
241
242 pub(super) async fn handle_remote_aggregations(
249 &mut self,
250 diffs: Vec<VectorDiff<TimelineEvent>>,
251 origin: RemoteEventOrigin,
252 room_data_provider: &P,
253 settings: &TimelineSettings,
254 ) {
255 let mut date_divider_adjuster =
256 DateDividerAdjuster::new(settings.date_divider_mode.clone());
257
258 for diff in diffs {
259 match diff {
260 VectorDiff::Append { values: events } => {
261 for event in events {
262 self.handle_remote_aggregation(
263 event,
264 TimelineItemPosition::End { origin },
265 room_data_provider,
266 &mut date_divider_adjuster,
267 )
268 .await;
269 }
270 }
271
272 VectorDiff::PushFront { value: event } => {
273 self.handle_remote_aggregation(
274 event,
275 TimelineItemPosition::Start { origin },
276 room_data_provider,
277 &mut date_divider_adjuster,
278 )
279 .await;
280 }
281
282 VectorDiff::PushBack { value: event } => {
283 self.handle_remote_aggregation(
284 event,
285 TimelineItemPosition::End { origin },
286 room_data_provider,
287 &mut date_divider_adjuster,
288 )
289 .await;
290 }
291
292 VectorDiff::Insert { index: event_index, value: event } => {
293 self.handle_remote_aggregation(
294 event,
295 TimelineItemPosition::At { event_index, origin },
296 room_data_provider,
297 &mut date_divider_adjuster,
298 )
299 .await;
300 }
301
302 VectorDiff::Set { index: event_index, value: event } => {
303 if let Some(timeline_item_index) = self
304 .items
305 .all_remote_events()
306 .get(event_index)
307 .and_then(|meta| meta.timeline_item_index)
308 {
309 self.handle_remote_aggregation(
310 event,
311 TimelineItemPosition::UpdateAt { timeline_item_index },
312 room_data_provider,
313 &mut date_divider_adjuster,
314 )
315 .await;
316 } else {
317 warn!(
318 event_index,
319 "Set update dropped because there wasn't any attached timeline item index."
320 );
321 }
322 }
323
324 VectorDiff::Remove { .. } | VectorDiff::Clear => {
325 }
329
330 v => unimplemented!("{v:?}"),
331 }
332 }
333
334 self.adjust_date_dividers(date_divider_adjuster);
335 self.check_invariants();
336 }
337
338 fn check_invariants(&self) {
339 self.check_no_duplicate_read_receipts();
340 self.check_no_unused_unique_ids();
341 }
342
343 fn check_no_duplicate_read_receipts(&self) {
344 let mut by_user_id = HashMap::new();
345 let mut duplicates = HashSet::new();
346
347 for item in self.items.iter_remotes_region().filter_map(|(_, item)| item.as_event()) {
348 if let Some(event_id) = item.event_id() {
349 for (user_id, _read_receipt) in item.read_receipts() {
350 if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
351 duplicates.insert((user_id.clone(), prev_event_id, event_id));
352 }
353 }
354 }
355 }
356
357 if !duplicates.is_empty() {
358 #[cfg(any(debug_assertions, test))]
359 panic!("duplicate read receipts in this timeline: {duplicates:?}\n{:?}", self.items);
360
361 #[cfg(not(any(debug_assertions, test)))]
362 tracing::error!(
363 ?duplicates,
364 items = ?self.items,
365 "duplicate read receipts in this timeline",
366 );
367 }
368 }
369
370 fn check_no_unused_unique_ids(&self) {
371 let duplicates = self
372 .items
373 .iter_all_regions()
374 .duplicates_by(|(_nth, item)| item.unique_id())
375 .map(|(_nth, item)| item.unique_id())
376 .collect::<Vec<_>>();
377
378 if !duplicates.is_empty() {
379 #[cfg(any(debug_assertions, test))]
380 panic!("duplicate unique ids in this timeline: {duplicates:?}\n{:?}", self.items);
381
382 #[cfg(not(any(debug_assertions, test)))]
383 tracing::error!(
384 ?duplicates,
385 items = ?self.items,
386 "duplicate unique ids in this timeline",
387 );
388 }
389 }
390
391 fn should_add_event_item(
393 &self,
394 room_data_provider: &P,
395 settings: &TimelineSettings,
396 event: &AnySyncTimelineEvent,
397 thread_root: Option<&EventId>,
398 position: TimelineItemPosition,
399 ) -> bool {
400 let rules = room_data_provider.room_version_rules();
401
402 if !(settings.event_filter)(event, &rules) {
403 return false;
405 }
406
407 match &self.focus {
408 TimelineFocusKind::PinnedEvents { .. } => {
409 room_data_provider.is_pinned_event(event.event_id())
411 }
412
413 TimelineFocusKind::Event { hide_threaded_events, .. } => {
414 if thread_root.is_some() && *hide_threaded_events {
417 return false;
418 }
419
420 let origin = match position {
422 TimelineItemPosition::End { origin }
423 | TimelineItemPosition::Start { origin }
424 | TimelineItemPosition::At { origin, .. } => origin,
425
426 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
427 .items
428 .get(idx)
429 .and_then(|item| item.as_event()?.as_remote())
430 .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
431 };
432
433 match origin {
434 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => false,
436 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => true,
437 }
438 }
439
440 TimelineFocusKind::Live { hide_threaded_events } => {
441 thread_root.is_none() || !hide_threaded_events
444 }
445
446 TimelineFocusKind::Thread { root_event_id, .. } => {
447 event.event_id() == root_event_id
449 || thread_root.as_ref().is_some_and(|r| r == root_event_id)
450 }
451 }
452 }
453
454 async fn maybe_add_error_item(
458 &mut self,
459 position: TimelineItemPosition,
460 room_data_provider: &P,
461 raw: &Raw<AnySyncTimelineEvent>,
462 deserialization_error: serde_json::Error,
463 settings: &TimelineSettings,
464 ) -> Option<(
465 OwnedEventId,
466 OwnedUserId,
467 MilliSecondsSinceUnixEpoch,
468 Option<OwnedTransactionId>,
469 Option<TimelineAction>,
470 bool,
471 )> {
472 let state_key: Option<String> = raw.get_field("state_key").ok().flatten();
473
474 let event_type = if let Some(state_key) = state_key {
481 raw.get_field("type")
482 .ok()
483 .flatten()
484 .map(|event_type| FailedToParseEvent::State { event_type, state_key })
485 } else {
486 raw.get_field("type").ok().flatten().map(FailedToParseEvent::MsgLike)
487 };
488
489 let event_id: Option<OwnedEventId> = raw.get_field("event_id").ok().flatten();
490 let Some(event_id) = event_id else {
491 warn!(
493 ?event_type,
494 "Failed to deserialize timeline event (with no ID): {deserialization_error}"
495 );
496 return None;
497 };
498
499 let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
500 let origin_server_ts: Option<MilliSecondsSinceUnixEpoch> =
501 raw.get_field("origin_server_ts").ok().flatten();
502
503 match (sender, origin_server_ts, event_type) {
504 (Some(sender), Some(origin_server_ts), Some(event_type))
505 if settings.add_failed_to_parse =>
506 {
507 #[derive(serde::Deserialize)]
510 struct Unsigned {
511 transaction_id: Option<OwnedTransactionId>,
512 }
513
514 let transaction_id: Option<OwnedTransactionId> = raw
515 .get_field::<Unsigned>("unsigned")
516 .ok()
517 .flatten()
518 .and_then(|unsigned| unsigned.transaction_id);
519
520 Some((
523 event_id,
524 sender,
525 origin_server_ts,
526 transaction_id,
527 Some(TimelineAction::failed_to_parse(event_type, deserialization_error)),
528 true,
529 ))
530 }
531
532 (sender, origin_server_ts, event_type) => {
533 warn!(
536 ?event_type,
537 ?event_id,
538 "Failed to deserialize timeline event: {deserialization_error}"
539 );
540
541 self.add_or_update_remote_event(
544 EventMeta::new(event_id, false),
545 sender.as_deref(),
546 origin_server_ts,
547 position,
548 room_data_provider,
549 settings,
550 )
551 .await;
552 None
553 }
554 }
555 }
556
557 async fn fetch_latest_thread_reply(
560 &mut self,
561 event_id: &EventId,
562 room_data_provider: &P,
563 ) -> Option<Box<EmbeddedEvent>> {
564 let event = RoomDataProvider::load_event(room_data_provider, event_id)
565 .await
566 .inspect_err(|err| {
567 warn!("Failed to load thread latest event: {err}");
568 })
569 .ok()?;
570
571 EmbeddedEvent::try_from_timeline_event(event, room_data_provider, &self.meta)
572 .await
573 .inspect_err(|err| {
574 warn!("Failed to extract thread latest event into a timeline item content: {err}");
575 })
576 .ok()
577 .flatten()
578 .map(Box::new)
579 }
580
581 pub(super) async fn handle_remote_event(
585 &mut self,
586 event: TimelineEvent,
587 position: TimelineItemPosition,
588 room_data_provider: &P,
589 settings: &TimelineSettings,
590 date_divider_adjuster: &mut DateDividerAdjuster,
591 ) -> RemovedItem {
592 let is_highlighted =
593 event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight));
594
595 let thread_summary = if let ThreadSummaryStatus::Some(summary) = event.thread_summary {
596 let latest_reply_item = if let Some(latest_reply) = summary.latest_reply {
597 self.fetch_latest_thread_reply(&latest_reply, room_data_provider).await
598 } else {
599 None
600 };
601 Some(ThreadSummary {
602 latest_event: TimelineDetails::from_initial_value(latest_reply_item),
603 num_replies: summary.num_replies,
604 })
605 } else {
606 None
607 };
608
609 let encryption_info = event.kind.encryption_info().cloned();
610
611 let bundled_edit_encryption_info = event.kind.unsigned_encryption_map().and_then(|map| {
612 map.get(&UnsignedEventLocation::RelationsReplace)?.encryption_info().cloned()
613 });
614
615 let (raw, utd_info) = match event.kind {
616 TimelineEventKind::UnableToDecrypt { utd_info, event } => (event, Some(utd_info)),
617 _ => (event.kind.into_raw(), None),
618 };
619
620 let (event_id, sender, timestamp, txn_id, timeline_action, should_add) = match raw
621 .deserialize()
622 {
623 Ok(event) => {
625 let (in_reply_to, thread_root) = self.meta.process_event_relations(
626 &event,
627 &raw,
628 bundled_edit_encryption_info,
629 &self.items,
630 matches!(self.focus, TimelineFocusKind::Thread { .. }),
631 );
632
633 let should_add = self.should_add_event_item(
634 room_data_provider,
635 settings,
636 &event,
637 thread_root.as_deref(),
638 position,
639 );
640
641 (
642 event.event_id().to_owned(),
643 event.sender().to_owned(),
644 event.origin_server_ts(),
645 event.transaction_id().map(ToOwned::to_owned),
646 TimelineAction::from_event(
647 event,
648 &raw,
649 room_data_provider,
650 utd_info
651 .map(|utd_info| (utd_info, self.meta.unable_to_decrypt_hook.as_ref())),
652 in_reply_to,
653 thread_root,
654 thread_summary,
655 )
656 .await,
657 should_add,
658 )
659 }
660
661 Err(e) => {
663 if let Some(tuple) =
664 self.maybe_add_error_item(position, room_data_provider, &raw, e, settings).await
665 {
666 tuple
667 } else {
668 return false;
669 }
670 }
671 };
672
673 self.add_or_update_remote_event(
676 EventMeta::new(event_id.clone(), should_add),
677 Some(&sender),
678 Some(timestamp),
679 position,
680 room_data_provider,
681 settings,
682 )
683 .await;
684
685 let item_added = if let Some(timeline_action) = timeline_action {
687 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
688
689 let ctx = TimelineEventContext {
690 sender,
691 sender_profile,
692 timestamp,
693 read_receipts: if settings.track_read_receipts && should_add {
694 self.meta.read_receipts.compute_event_receipts(
695 &event_id,
696 &mut self.items,
697 matches!(position, TimelineItemPosition::End { .. }),
698 )
699 } else {
700 Default::default()
701 },
702 is_highlighted,
703 flow: Flow::Remote {
704 event_id: event_id.clone(),
705 raw_event: raw,
706 encryption_info,
707 txn_id,
708 position,
709 },
710 should_add_new_items: should_add,
711 };
712
713 TimelineEventHandler::new(self, ctx)
714 .handle_event(date_divider_adjuster, timeline_action)
715 .await
716 } else {
717 false
719 };
720
721 let mut item_removed = false;
722
723 if !item_added {
724 trace!("No new item added");
725
726 if let TimelineItemPosition::UpdateAt { timeline_item_index } = position {
727 trace!("Removing UTD that was successfully retried");
730 self.items.remove(timeline_item_index);
731 item_removed = true;
732 }
733 }
734
735 item_removed
736 }
737
738 fn remove_timeline_item(
740 &mut self,
741 event_index: usize,
742 day_divider_adjuster: &mut DateDividerAdjuster,
743 ) {
744 day_divider_adjuster.mark_used();
745
746 if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
756 if let Some(timeline_item_index) = event_meta.timeline_item_index {
758 let _ = self.items.remove(timeline_item_index);
759 }
760
761 self.items.remove_remote_event(event_index);
763 }
764 }
765
766 pub(super) fn clear(&mut self) {
767 if self.items.has_local() {
773 self.items.for_each(|entry| {
775 if entry.is_remote_event()
776 || entry.as_virtual().is_some_and(|vitem| match vitem {
777 VirtualTimelineItem::DateDivider(_) => false,
778 VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
779 true
780 }
781 })
782 {
783 ObservableItemsTransactionEntry::remove(entry);
784 }
785 });
786
787 let mut idx = 0;
789 while idx < self.items.len() {
790 if self.items[idx].is_date_divider()
791 && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
792 {
793 self.items.remove(idx);
794 } else {
796 idx += 1;
797 }
798 }
799 } else {
800 self.items.clear();
801 }
802
803 self.meta.clear();
804
805 debug!(remaining_items = self.items.len(), "Timeline cleared");
806 }
807
808 #[instrument(skip_all)]
809 pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
810 if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
812 return;
813 }
814
815 self.meta.fully_read_event = Some(fully_read_event_id);
816 self.meta.update_read_marker(&mut self.items);
817 }
818
819 pub(super) fn commit(self) {
820 let previous_number_of_items = self.number_of_items_when_transaction_started;
822 let next_number_of_items = self.items.len();
823
824 if previous_number_of_items != next_number_of_items {
825 let count = self
826 .meta
827 .subscriber_skip_count
828 .compute_next(previous_number_of_items, next_number_of_items);
829 self.meta
830 .subscriber_skip_count
831 .update(count, matches!(self.focus, TimelineFocusKind::Live { .. }));
832 }
833
834 *self.previous_meta = self.meta;
836
837 self.items.commit();
838 }
839
840 async fn add_or_update_remote_event(
845 &mut self,
846 event_meta: EventMeta,
847 sender: Option<&UserId>,
848 timestamp: Option<MilliSecondsSinceUnixEpoch>,
849 position: TimelineItemPosition,
850 room_data_provider: &P,
851 settings: &TimelineSettings,
852 ) {
853 let event_id = event_meta.event_id.clone();
854
855 match position {
856 TimelineItemPosition::Start { .. } => self.items.push_front_remote_event(event_meta),
857
858 TimelineItemPosition::End { .. } => {
859 self.items.push_back_remote_event(event_meta);
860 }
861
862 TimelineItemPosition::At { event_index, .. } => {
863 self.items.insert_remote_event(event_index, event_meta);
864 }
865
866 TimelineItemPosition::UpdateAt { .. } => {
867 if let Some(event) =
868 self.items.get_remote_event_by_event_id_mut(&event_meta.event_id)
869 && event.visible != event_meta.visible
870 {
871 event.visible = event_meta.visible;
872
873 if settings.track_read_receipts {
874 self.maybe_update_read_receipts_of_prev_event(&event_meta.event_id);
877 }
878 }
879 }
880 }
881
882 if settings.track_read_receipts
883 && matches!(
884 position,
885 TimelineItemPosition::Start { .. }
886 | TimelineItemPosition::End { .. }
887 | TimelineItemPosition::At { .. }
888 )
889 {
890 self.load_read_receipts_for_event(&event_id, room_data_provider).await;
891
892 self.maybe_add_implicit_read_receipt(&event_id, sender, timestamp);
893 }
894 }
895
896 pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
897 adjuster.run(&mut self.items, &mut self.meta);
898 }
899
900 pub(super) fn mark_all_events_as_encrypted(&mut self) {
904 for idx in 0..self.items.len() {
905 let item = &self.items[idx];
906
907 if let Some(event) = item.as_event() {
908 if event.is_room_encrypted {
909 continue;
910 }
911
912 let mut cloned_event = event.clone();
913 cloned_event.is_room_encrypted = true;
914
915 let item = item.with_kind(cloned_event);
917 self.items.replace(idx, item);
918 }
919 }
920 }
921}