1use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::{
20 ThreadSummaryStatus, TimelineEvent, TimelineEventKind, UnsignedEventLocation,
21};
22use ruma::{
23 events::AnySyncTimelineEvent, push::Action, serde::Raw, EventId, MilliSecondsSinceUnixEpoch,
24 OwnedEventId, OwnedTransactionId, OwnedUserId, UserId,
25};
26use tracing::{debug, instrument, warn};
27
28use super::{
29 super::{
30 controller::ObservableItemsTransactionEntry,
31 date_dividers::DateDividerAdjuster,
32 event_handler::{Flow, TimelineEventContext, TimelineEventHandler, TimelineItemPosition},
33 event_item::RemoteEventOrigin,
34 traits::RoomDataProvider,
35 },
36 metadata::EventMeta,
37 ObservableItems, ObservableItemsTransaction, TimelineFocusKind, TimelineMetadata,
38 TimelineSettings,
39};
40use crate::timeline::{
41 event_handler::{FailedToParseEvent, RemovedItem, TimelineAction},
42 EmbeddedEvent, ThreadSummary, TimelineDetails, VirtualTimelineItem,
43};
44
45pub(in crate::timeline) struct TimelineStateTransaction<'a> {
46 pub items: ObservableItemsTransaction<'a>,
49
50 number_of_items_when_transaction_started: usize,
52
53 pub meta: TimelineMetadata,
57
58 previous_meta: &'a mut TimelineMetadata,
60
61 pub(super) timeline_focus: TimelineFocusKind,
63}
64
65impl<'a> TimelineStateTransaction<'a> {
66 pub(super) fn new(
68 items: &'a mut ObservableItems,
69 meta: &'a mut TimelineMetadata,
70 timeline_focus: TimelineFocusKind,
71 ) -> Self {
72 let previous_meta = meta;
73 let meta = previous_meta.clone();
74 let items = items.transaction();
75
76 Self {
77 number_of_items_when_transaction_started: items.len(),
78 items,
79 previous_meta,
80 meta,
81 timeline_focus,
82 }
83 }
84
85 pub(super) async fn handle_remote_events_with_diffs<RoomData>(
87 &mut self,
88 diffs: Vec<VectorDiff<TimelineEvent>>,
89 origin: RemoteEventOrigin,
90 room_data_provider: &RoomData,
91 settings: &TimelineSettings,
92 ) where
93 RoomData: RoomDataProvider,
94 {
95 let mut date_divider_adjuster =
96 DateDividerAdjuster::new(settings.date_divider_mode.clone());
97
98 for diff in diffs {
99 match diff {
100 VectorDiff::Append { values: events } => {
101 for event in events {
102 self.handle_remote_event(
103 event,
104 TimelineItemPosition::End { origin },
105 room_data_provider,
106 settings,
107 &mut date_divider_adjuster,
108 )
109 .await;
110 }
111 }
112
113 VectorDiff::PushFront { value: event } => {
114 self.handle_remote_event(
115 event,
116 TimelineItemPosition::Start { origin },
117 room_data_provider,
118 settings,
119 &mut date_divider_adjuster,
120 )
121 .await;
122 }
123
124 VectorDiff::PushBack { value: event } => {
125 self.handle_remote_event(
126 event,
127 TimelineItemPosition::End { origin },
128 room_data_provider,
129 settings,
130 &mut date_divider_adjuster,
131 )
132 .await;
133 }
134
135 VectorDiff::Insert { index: event_index, value: event } => {
136 self.handle_remote_event(
137 event,
138 TimelineItemPosition::At { event_index, origin },
139 room_data_provider,
140 settings,
141 &mut date_divider_adjuster,
142 )
143 .await;
144 }
145
146 VectorDiff::Set { index: event_index, value: event } => {
147 if let Some(timeline_item_index) = self
148 .items
149 .all_remote_events()
150 .get(event_index)
151 .and_then(|meta| meta.timeline_item_index)
152 {
153 self.handle_remote_event(
154 event,
155 TimelineItemPosition::UpdateAt { timeline_item_index },
156 room_data_provider,
157 settings,
158 &mut date_divider_adjuster,
159 )
160 .await;
161 } else {
162 warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
163 }
164 }
165
166 VectorDiff::Remove { index: event_index } => {
167 self.remove_timeline_item(event_index, &mut date_divider_adjuster);
168 }
169
170 VectorDiff::Clear => {
171 self.clear();
172 }
173
174 v => unimplemented!("{v:?}"),
175 }
176 }
177
178 self.adjust_date_dividers(date_divider_adjuster);
179 self.check_invariants();
180 }
181
182 async fn handle_remote_aggregation<RoomData>(
183 &mut self,
184 event: TimelineEvent,
185 position: TimelineItemPosition,
186 room_data_provider: &RoomData,
187 date_divider_adjuster: &mut DateDividerAdjuster,
188 ) where
189 RoomData: RoomDataProvider,
190 {
191 let deserialized = match event.raw().deserialize() {
192 Ok(deserialized) => deserialized,
193 Err(err) => {
194 warn!("Failed to deserialize timeline event: {err}");
195 return;
196 }
197 };
198
199 let sender = deserialized.sender().to_owned();
200 let timestamp = deserialized.origin_server_ts();
201 let event_id = deserialized.event_id().to_owned();
202 let txn_id = deserialized.transaction_id().map(ToOwned::to_owned);
203
204 if let Some(action @ TimelineAction::HandleAggregation { .. }) = TimelineAction::from_event(
205 deserialized,
206 event.raw(),
207 room_data_provider,
208 None,
209 &self.meta,
210 None,
211 None,
212 None,
213 )
214 .await
215 {
216 let encryption_info = event.kind.encryption_info().cloned();
217
218 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
219
220 let ctx = TimelineEventContext {
221 sender,
222 sender_profile,
223 timestamp,
224 read_receipts: Default::default(),
226 is_highlighted: false,
227 flow: Flow::Remote {
228 event_id: event_id.clone(),
229 raw_event: event.raw().clone(),
230 encryption_info,
231 txn_id,
232 position,
233 },
234 should_add_new_items: false,
236 };
237
238 TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, action).await;
239 }
240 }
241
242 pub(super) async fn handle_remote_aggregations<RoomData>(
249 &mut self,
250 diffs: Vec<VectorDiff<TimelineEvent>>,
251 origin: RemoteEventOrigin,
252 room_data_provider: &RoomData,
253 settings: &TimelineSettings,
254 ) where
255 RoomData: RoomDataProvider,
256 {
257 let mut date_divider_adjuster =
258 DateDividerAdjuster::new(settings.date_divider_mode.clone());
259
260 for diff in diffs {
261 match diff {
262 VectorDiff::Append { values: events } => {
263 for event in events {
264 self.handle_remote_aggregation(
265 event,
266 TimelineItemPosition::End { origin },
267 room_data_provider,
268 &mut date_divider_adjuster,
269 )
270 .await;
271 }
272 }
273
274 VectorDiff::PushFront { value: event } => {
275 self.handle_remote_aggregation(
276 event,
277 TimelineItemPosition::Start { origin },
278 room_data_provider,
279 &mut date_divider_adjuster,
280 )
281 .await;
282 }
283
284 VectorDiff::PushBack { value: event } => {
285 self.handle_remote_aggregation(
286 event,
287 TimelineItemPosition::End { origin },
288 room_data_provider,
289 &mut date_divider_adjuster,
290 )
291 .await;
292 }
293
294 VectorDiff::Insert { index: event_index, value: event } => {
295 self.handle_remote_aggregation(
296 event,
297 TimelineItemPosition::At { event_index, origin },
298 room_data_provider,
299 &mut date_divider_adjuster,
300 )
301 .await;
302 }
303
304 VectorDiff::Set { index: event_index, value: event } => {
305 if let Some(timeline_item_index) = self
306 .items
307 .all_remote_events()
308 .get(event_index)
309 .and_then(|meta| meta.timeline_item_index)
310 {
311 self.handle_remote_aggregation(
312 event,
313 TimelineItemPosition::UpdateAt { timeline_item_index },
314 room_data_provider,
315 &mut date_divider_adjuster,
316 )
317 .await;
318 } else {
319 warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
320 }
321 }
322
323 VectorDiff::Remove { .. } | VectorDiff::Clear => {
324 }
328
329 v => unimplemented!("{v:?}"),
330 }
331 }
332
333 self.adjust_date_dividers(date_divider_adjuster);
334 self.check_invariants();
335 }
336
337 fn check_invariants(&self) {
338 self.check_no_duplicate_read_receipts();
339 self.check_no_unused_unique_ids();
340 }
341
342 fn check_no_duplicate_read_receipts(&self) {
343 let mut by_user_id = HashMap::new();
344 let mut duplicates = HashSet::new();
345
346 for item in self.items.iter_remotes_region().filter_map(|(_, item)| item.as_event()) {
347 if let Some(event_id) = item.event_id() {
348 for (user_id, _read_receipt) in item.read_receipts() {
349 if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
350 duplicates.insert((user_id.clone(), prev_event_id, event_id));
351 }
352 }
353 }
354 }
355
356 if !duplicates.is_empty() {
357 #[cfg(any(debug_assertions, test))]
358 panic!("duplicate read receipts in this timeline: {duplicates:?}\n{:?}", self.items);
359
360 #[cfg(not(any(debug_assertions, test)))]
361 tracing::error!(
362 ?duplicates,
363 items = ?self.items,
364 "duplicate read receipts in this timeline",
365 );
366 }
367 }
368
369 fn check_no_unused_unique_ids(&self) {
370 let duplicates = self
371 .items
372 .iter_all_regions()
373 .duplicates_by(|(_nth, item)| item.unique_id())
374 .map(|(_nth, item)| item.unique_id())
375 .collect::<Vec<_>>();
376
377 if !duplicates.is_empty() {
378 #[cfg(any(debug_assertions, test))]
379 panic!("duplicate unique ids in this timeline: {duplicates:?}\n{:?}", self.items);
380
381 #[cfg(not(any(debug_assertions, test)))]
382 tracing::error!(
383 ?duplicates,
384 items = ?self.items,
385 "duplicate unique ids in this timeline",
386 );
387 }
388 }
389
390 fn should_add_event_item<P: RoomDataProvider>(
392 &self,
393 room_data_provider: &P,
394 settings: &TimelineSettings,
395 event: &AnySyncTimelineEvent,
396 thread_root: Option<&EventId>,
397 position: TimelineItemPosition,
398 ) -> bool {
399 let room_version = room_data_provider.room_version();
400 if !(settings.event_filter)(event, &room_version) {
401 return false;
403 }
404
405 match &self.timeline_focus {
406 TimelineFocusKind::PinnedEvents => {
407 room_data_provider.is_pinned_event(event.event_id())
409 }
410
411 TimelineFocusKind::Event { hide_threaded_events } => {
412 if thread_root.is_some() && *hide_threaded_events {
415 return false;
416 }
417
418 let origin = match position {
420 TimelineItemPosition::End { origin }
421 | TimelineItemPosition::Start { origin }
422 | TimelineItemPosition::At { origin, .. } => origin,
423
424 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
425 .items
426 .get(idx)
427 .and_then(|item| item.as_event()?.as_remote())
428 .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
429 };
430
431 match origin {
432 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => false,
434 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => true,
435 }
436 }
437
438 TimelineFocusKind::Live { hide_threaded_events } => {
439 thread_root.is_none() || !hide_threaded_events
442 }
443
444 TimelineFocusKind::Thread { root_event_id } => {
445 event.event_id() == root_event_id
447 || thread_root.as_ref().is_some_and(|r| r == root_event_id)
448 }
449 }
450 }
451
452 async fn maybe_add_error_item(
456 &mut self,
457 position: TimelineItemPosition,
458 room_data_provider: &impl RoomDataProvider,
459 raw: &Raw<AnySyncTimelineEvent>,
460 deserialization_error: serde_json::Error,
461 settings: &TimelineSettings,
462 ) -> Option<(
463 OwnedEventId,
464 OwnedUserId,
465 MilliSecondsSinceUnixEpoch,
466 Option<OwnedTransactionId>,
467 Option<TimelineAction>,
468 bool,
469 )> {
470 let state_key: Option<String> = raw.get_field("state_key").ok().flatten();
471
472 let event_type = if let Some(state_key) = state_key {
479 raw.get_field("type")
480 .ok()
481 .flatten()
482 .map(|event_type| FailedToParseEvent::State { event_type, state_key })
483 } else {
484 raw.get_field("type").ok().flatten().map(FailedToParseEvent::MsgLike)
485 };
486
487 let event_id: Option<OwnedEventId> = raw.get_field("event_id").ok().flatten();
488 let Some(event_id) = event_id else {
489 warn!(
491 ?event_type,
492 "Failed to deserialize timeline event (with no ID): {deserialization_error}"
493 );
494 return None;
495 };
496
497 let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
498 let origin_server_ts: Option<MilliSecondsSinceUnixEpoch> =
499 raw.get_field("origin_server_ts").ok().flatten();
500
501 match (sender, origin_server_ts, event_type) {
502 (Some(sender), Some(origin_server_ts), Some(event_type))
503 if settings.add_failed_to_parse =>
504 {
505 #[derive(serde::Deserialize)]
508 struct Unsigned {
509 transaction_id: Option<OwnedTransactionId>,
510 }
511
512 let transaction_id: Option<OwnedTransactionId> = raw
513 .get_field::<Unsigned>("unsigned")
514 .ok()
515 .flatten()
516 .and_then(|unsigned| unsigned.transaction_id);
517
518 Some((
521 event_id,
522 sender,
523 origin_server_ts,
524 transaction_id,
525 Some(TimelineAction::failed_to_parse(event_type, deserialization_error)),
526 true,
527 ))
528 }
529
530 (sender, origin_server_ts, event_type) => {
531 warn!(
534 ?event_type,
535 ?event_id,
536 "Failed to deserialize timeline event: {deserialization_error}"
537 );
538
539 self.add_or_update_remote_event(
542 EventMeta::new(event_id, false),
543 sender.as_deref(),
544 origin_server_ts,
545 position,
546 room_data_provider,
547 settings,
548 )
549 .await;
550 None
551 }
552 }
553 }
554
555 async fn fetch_latest_thread_reply(
558 &mut self,
559 event_id: &EventId,
560 room_data_provider: &impl RoomDataProvider,
561 ) -> Option<Box<EmbeddedEvent>> {
562 let event = room_data_provider
563 .load_event(event_id)
564 .await
565 .inspect_err(|err| {
566 warn!("Failed to load thread latest event: {err}");
567 })
568 .ok()?;
569
570 EmbeddedEvent::try_from_timeline_event(event, room_data_provider, &self.meta)
571 .await
572 .inspect_err(|err| {
573 warn!("Failed to extract thread latest event into a timeline item content: {err}");
574 })
575 .ok()
576 .flatten()
577 .map(Box::new)
578 }
579
580 pub(super) async fn handle_remote_event<P: RoomDataProvider>(
584 &mut self,
585 event: TimelineEvent,
586 position: TimelineItemPosition,
587 room_data_provider: &P,
588 settings: &TimelineSettings,
589 date_divider_adjuster: &mut DateDividerAdjuster,
590 ) -> RemovedItem {
591 let is_highlighted =
592 event.push_actions().is_some_and(|actions| actions.iter().any(Action::is_highlight));
593
594 let thread_summary = if let ThreadSummaryStatus::Some(summary) = event.thread_summary {
595 let latest_reply_item = if let Some(latest_reply) = summary.latest_reply {
596 self.fetch_latest_thread_reply(&latest_reply, room_data_provider).await
597 } else {
598 None
599 };
600 Some(ThreadSummary {
601 latest_event: TimelineDetails::from_initial_value(latest_reply_item),
602 num_replies: summary.num_replies,
603 })
604 } else {
605 None
606 };
607
608 let encryption_info = event.kind.encryption_info().cloned();
609
610 let bundled_edit_encryption_info = event.kind.unsigned_encryption_map().and_then(|map| {
611 map.get(&UnsignedEventLocation::RelationsReplace)?.encryption_info().cloned()
612 });
613
614 let (raw, utd_info) = match event.kind {
615 TimelineEventKind::UnableToDecrypt { utd_info, event } => (event, Some(utd_info)),
616 _ => (event.kind.into_raw(), None),
617 };
618
619 let (event_id, sender, timestamp, txn_id, timeline_action, should_add) = match raw
620 .deserialize()
621 {
622 Ok(event) => {
624 let (in_reply_to, thread_root) = self.meta.process_event_relations(
625 &event,
626 &raw,
627 bundled_edit_encryption_info,
628 &self.items,
629 &self.timeline_focus,
630 );
631
632 let should_add = self.should_add_event_item(
633 room_data_provider,
634 settings,
635 &event,
636 thread_root.as_deref(),
637 position,
638 );
639
640 (
641 event.event_id().to_owned(),
642 event.sender().to_owned(),
643 event.origin_server_ts(),
644 event.transaction_id().map(ToOwned::to_owned),
645 TimelineAction::from_event(
646 event,
647 &raw,
648 room_data_provider,
649 utd_info,
650 &self.meta,
651 in_reply_to,
652 thread_root,
653 thread_summary,
654 )
655 .await,
656 should_add,
657 )
658 }
659
660 Err(e) => {
662 if let Some(tuple) =
663 self.maybe_add_error_item(position, room_data_provider, &raw, e, settings).await
664 {
665 tuple
666 } else {
667 return false;
668 }
669 }
670 };
671
672 self.add_or_update_remote_event(
675 EventMeta::new(event_id.clone(), should_add),
676 Some(&sender),
677 Some(timestamp),
678 position,
679 room_data_provider,
680 settings,
681 )
682 .await;
683
684 if let Some(timeline_action) = timeline_action {
686 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
687
688 let ctx = TimelineEventContext {
689 sender,
690 sender_profile,
691 timestamp,
692 read_receipts: if settings.track_read_receipts && should_add {
693 self.meta.read_receipts.compute_event_receipts(
694 &event_id,
695 &mut self.items,
696 matches!(position, TimelineItemPosition::End { .. }),
697 )
698 } else {
699 Default::default()
700 },
701 is_highlighted,
702 flow: Flow::Remote {
703 event_id: event_id.clone(),
704 raw_event: raw,
705 encryption_info,
706 txn_id,
707 position,
708 },
709 should_add_new_items: should_add,
710 };
711
712 TimelineEventHandler::new(self, ctx)
713 .handle_event(date_divider_adjuster, timeline_action)
714 .await
715 } else {
716 false
718 }
719 }
720
721 fn remove_timeline_item(
723 &mut self,
724 event_index: usize,
725 day_divider_adjuster: &mut DateDividerAdjuster,
726 ) {
727 day_divider_adjuster.mark_used();
728
729 if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
739 if let Some(timeline_item_index) = event_meta.timeline_item_index {
741 let _ = self.items.remove(timeline_item_index);
742 }
743
744 self.items.remove_remote_event(event_index);
746 }
747 }
748
749 pub(super) fn clear(&mut self) {
750 if self.items.has_local() {
756 self.items.for_each(|entry| {
758 if entry.is_remote_event()
759 || entry.as_virtual().is_some_and(|vitem| match vitem {
760 VirtualTimelineItem::DateDivider(_) => false,
761 VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
762 true
763 }
764 })
765 {
766 ObservableItemsTransactionEntry::remove(entry);
767 }
768 });
769
770 let mut idx = 0;
772 while idx < self.items.len() {
773 if self.items[idx].is_date_divider()
774 && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
775 {
776 self.items.remove(idx);
777 } else {
779 idx += 1;
780 }
781 }
782 } else {
783 self.items.clear();
784 }
785
786 self.meta.clear();
787
788 debug!(remaining_items = self.items.len(), "Timeline cleared");
789 }
790
791 #[instrument(skip_all)]
792 pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
793 if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
795 return;
796 }
797
798 self.meta.fully_read_event = Some(fully_read_event_id);
799 self.meta.update_read_marker(&mut self.items);
800 }
801
802 pub(super) fn commit(self) {
803 let previous_number_of_items = self.number_of_items_when_transaction_started;
805 let next_number_of_items = self.items.len();
806
807 if previous_number_of_items != next_number_of_items {
808 let count = self
809 .meta
810 .subscriber_skip_count
811 .compute_next(previous_number_of_items, next_number_of_items);
812 self.meta.subscriber_skip_count.update(count, &self.timeline_focus);
813 }
814
815 *self.previous_meta = self.meta;
817
818 self.items.commit();
819 }
820
821 async fn add_or_update_remote_event<P: RoomDataProvider>(
826 &mut self,
827 event_meta: EventMeta,
828 sender: Option<&UserId>,
829 timestamp: Option<MilliSecondsSinceUnixEpoch>,
830 position: TimelineItemPosition,
831 room_data_provider: &P,
832 settings: &TimelineSettings,
833 ) {
834 let event_id = event_meta.event_id.clone();
835
836 match position {
837 TimelineItemPosition::Start { .. } => self.items.push_front_remote_event(event_meta),
838
839 TimelineItemPosition::End { .. } => {
840 self.items.push_back_remote_event(event_meta);
841 }
842
843 TimelineItemPosition::At { event_index, .. } => {
844 self.items.insert_remote_event(event_index, event_meta);
845 }
846
847 TimelineItemPosition::UpdateAt { .. } => {
848 if let Some(event) =
849 self.items.get_remote_event_by_event_id_mut(&event_meta.event_id)
850 {
851 if event.visible != event_meta.visible {
852 event.visible = event_meta.visible;
853
854 if settings.track_read_receipts {
855 self.maybe_update_read_receipts_of_prev_event(&event_meta.event_id);
858 }
859 }
860 }
861 }
862 }
863
864 if settings.track_read_receipts
865 && matches!(
866 position,
867 TimelineItemPosition::Start { .. }
868 | TimelineItemPosition::End { .. }
869 | TimelineItemPosition::At { .. }
870 )
871 {
872 self.load_read_receipts_for_event(&event_id, room_data_provider).await;
873
874 self.maybe_add_implicit_read_receipt(&event_id, sender, timestamp);
875 }
876 }
877
878 pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
879 adjuster.run(&mut self.items, &mut self.meta);
880 }
881
882 pub(super) fn mark_all_events_as_encrypted(&mut self) {
886 for idx in 0..self.items.len() {
887 let item = &self.items[idx];
888
889 if let Some(event) = item.as_event() {
890 if event.is_room_encrypted {
891 continue;
892 }
893
894 let mut cloned_event = event.clone();
895 cloned_event.is_room_encrypted = true;
896
897 let item = item.with_kind(cloned_event);
899 self.items.replace(idx, item);
900 }
901 }
902 }
903}