1use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 crypto::types::events::UtdCause,
21 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22 send_queue::SendHandle,
23};
24use ruma::{
25 events::{
26 poll::unstable_start::{
27 NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
28 },
29 receipt::Receipt,
30 relation::Replacement,
31 room::message::{
32 Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
33 },
34 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
35 AnySyncTimelineEvent, EventContent, FullStateEventContent, MessageLikeEventType,
36 StateEventType, SyncStateEvent,
37 },
38 serde::Raw,
39 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
40 TransactionId,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45 controller::{
46 find_item_and_apply_aggregation, Aggregation, AggregationKind, ObservableItemsTransaction,
47 PendingEditKind, TimelineMetadata, TimelineStateTransaction,
48 },
49 date_dividers::DateDividerAdjuster,
50 event_item::{
51 AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
52 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
53 TimelineEventItemId,
54 },
55 traits::RoomDataProvider,
56 EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
57 MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
58 TimelineItemContent,
59};
60use crate::timeline::controller::aggregations::PendingEdit;
61
62pub(super) enum Flow {
64 Local {
66 txn_id: OwnedTransactionId,
68
69 send_handle: Option<SendHandle>,
71 },
72
73 Remote {
76 event_id: OwnedEventId,
78 txn_id: Option<OwnedTransactionId>,
81 raw_event: Raw<AnySyncTimelineEvent>,
83 position: TimelineItemPosition,
85 encryption_info: Option<Arc<EncryptionInfo>>,
87 },
88}
89
90impl Flow {
91 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
93 match self {
94 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
95 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
96 }
97 }
98
99 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
101 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
102 }
103}
104
105pub(super) struct TimelineEventContext {
106 pub(super) sender: OwnedUserId,
107 pub(super) sender_profile: Option<Profile>,
108 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
110 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
111 pub(super) is_highlighted: bool,
112 pub(super) flow: Flow,
113
114 pub(super) should_add_new_items: bool,
120}
121
122#[derive(Clone, Debug)]
125pub(super) enum HandleAggregationKind {
126 Reaction { key: String },
128
129 Redaction,
131
132 Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
134
135 PollResponse { answers: Vec<String> },
137
138 PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
140
141 PollEnd,
143}
144
145#[derive(Clone, Debug)]
147#[allow(clippy::large_enum_variant)]
148pub(super) enum TimelineAction {
149 AddItem {
155 content: TimelineItemContent,
157 },
158
159 HandleAggregation {
165 related_event: OwnedEventId,
167 kind: HandleAggregationKind,
169 },
170}
171
172impl TimelineAction {
173 fn add_item(content: TimelineItemContent) -> Self {
175 Self::AddItem { content }
176 }
177
178 #[allow(clippy::too_many_arguments)]
183 pub async fn from_event<P: RoomDataProvider>(
184 event: AnySyncTimelineEvent,
185 raw_event: &Raw<AnySyncTimelineEvent>,
186 room_data_provider: &P,
187 unable_to_decrypt_info: Option<UnableToDecryptInfo>,
188 meta: &TimelineMetadata,
189 in_reply_to: Option<InReplyToDetails>,
190 thread_root: Option<OwnedEventId>,
191 thread_summary: Option<ThreadSummary>,
192 ) -> Option<Self> {
193 let room_version = room_data_provider.room_version();
194
195 let redacted_message_or_none = |event_type: MessageLikeEventType| {
196 (event_type != MessageLikeEventType::Reaction)
197 .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
198 };
199
200 Some(match event {
201 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
202 if let Some(redacts) = ev.redacts(&room_version).map(ToOwned::to_owned) {
203 Self::HandleAggregation {
204 related_event: redacts,
205 kind: HandleAggregationKind::Redaction,
206 }
207 } else {
208 Self::add_item(redacted_message_or_none(ev.event_type())?)
209 }
210 }
211
212 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
213 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
214 if let Some(unable_to_decrypt_info) = unable_to_decrypt_info {
216 let utd_cause = UtdCause::determine(
217 raw_event,
218 room_data_provider.crypto_context_info().await,
219 &unable_to_decrypt_info,
220 );
221
222 if let Some(hook) = meta.unable_to_decrypt_hook.as_ref() {
225 hook.on_utd(
226 ev.event_id(),
227 utd_cause,
228 ev.origin_server_ts(),
229 ev.sender(),
230 )
231 .await;
232 }
233
234 Self::add_item(TimelineItemContent::MsgLike(
235 MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
236 content, utd_cause,
237 )),
238 ))
239 } else {
240 return Self::from_content(
245 AnyMessageLikeEventContent::RoomEncrypted(content),
246 in_reply_to,
247 thread_root,
248 thread_summary,
249 );
250 }
251 }
252
253 Some(content) => {
254 return Self::from_content(content, in_reply_to, thread_root, thread_summary);
255 }
256
257 None => Self::add_item(redacted_message_or_none(ev.event_type())?),
258 },
259
260 AnySyncTimelineEvent::State(ev) => match ev {
261 AnySyncStateEvent::RoomMember(ev) => match ev {
262 SyncStateEvent::Original(ev) => {
263 Self::add_item(TimelineItemContent::room_member(
264 ev.state_key,
265 FullStateEventContent::Original {
266 content: ev.content,
267 prev_content: ev.unsigned.prev_content,
268 },
269 ev.sender,
270 ))
271 }
272 SyncStateEvent::Redacted(ev) => {
273 Self::add_item(TimelineItemContent::room_member(
274 ev.state_key,
275 FullStateEventContent::Redacted(ev.content),
276 ev.sender,
277 ))
278 }
279 },
280 ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
281 state_key: ev.state_key().to_owned(),
282 content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
283 })),
284 },
285 })
286 }
287
288 pub(super) fn from_content(
297 content: AnyMessageLikeEventContent,
298 in_reply_to: Option<InReplyToDetails>,
299 thread_root: Option<OwnedEventId>,
300 thread_summary: Option<ThreadSummary>,
301 ) -> Option<Self> {
302 Some(match content {
303 AnyMessageLikeEventContent::Reaction(c) => {
304 Self::HandleAggregation {
306 related_event: c.relates_to.event_id.clone(),
307 kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
308 }
309 }
310
311 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
312 relates_to: Some(Relation::Replacement(re)),
313 ..
314 }) => Self::HandleAggregation {
315 related_event: re.event_id.clone(),
316 kind: HandleAggregationKind::Edit { replacement: re },
317 },
318
319 AnyMessageLikeEventContent::UnstablePollStart(
320 UnstablePollStartEventContent::Replacement(re),
321 ) => Self::HandleAggregation {
322 related_event: re.relates_to.event_id.clone(),
323 kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
324 },
325
326 AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
327 related_event: c.relates_to.event_id,
328 kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
329 },
330
331 AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
332 related_event: c.relates_to.event_id,
333 kind: HandleAggregationKind::PollEnd,
334 },
335
336 AnyMessageLikeEventContent::CallInvite(_) => {
337 Self::add_item(TimelineItemContent::CallInvite)
338 }
339
340 AnyMessageLikeEventContent::CallNotify(_) => {
341 Self::add_item(TimelineItemContent::CallNotify)
342 }
343
344 AnyMessageLikeEventContent::Sticker(content) => {
345 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
346 kind: MsgLikeKind::Sticker(Sticker { content }),
347 reactions: Default::default(),
348 thread_root,
349 in_reply_to,
350 thread_summary,
351 }))
352 }
353
354 AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
355 c,
356 )) => {
357 let poll_state = PollState::new(c);
358
359 Self::AddItem {
360 content: TimelineItemContent::MsgLike(MsgLikeContent {
361 kind: MsgLikeKind::Poll(poll_state),
362 reactions: Default::default(),
363 thread_root,
364 in_reply_to,
365 thread_summary,
366 }),
367 }
368 }
369
370 AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
371 content: TimelineItemContent::message(
372 msg.msgtype,
373 msg.mentions,
374 Default::default(),
375 thread_root,
376 in_reply_to,
377 thread_summary,
378 ),
379 },
380
381 _ => {
382 debug!(
383 "Ignoring message-like event of type `{}`, not supported (yet)",
384 content.event_type()
385 );
386 return None;
387 }
388 })
389 }
390
391 pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
392 let error = Arc::new(error);
393 match event {
394 FailedToParseEvent::State { event_type, state_key } => {
395 Self::add_item(TimelineItemContent::FailedToParseState {
396 event_type,
397 state_key,
398 error,
399 })
400 }
401 FailedToParseEvent::MsgLike(event_type) => {
402 Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
403 }
404 }
405 }
406}
407
408#[derive(Debug)]
409pub(super) enum FailedToParseEvent {
410 MsgLike(MessageLikeEventType),
411 State { event_type: StateEventType, state_key: String },
412}
413
414#[derive(Clone, Copy, Debug)]
416pub(super) enum TimelineItemPosition {
417 Start {
420 origin: RemoteEventOrigin,
422 },
423
424 End {
427 origin: RemoteEventOrigin,
429 },
430
431 At {
433 event_index: usize,
435
436 origin: RemoteEventOrigin,
438 },
439
440 UpdateAt {
445 timeline_item_index: usize,
447 },
448}
449
450pub(super) type RemovedItem = bool;
452
453pub(super) struct TimelineEventHandler<'a, 'o> {
460 items: &'a mut ObservableItemsTransaction<'o>,
461 meta: &'a mut TimelineMetadata,
462 ctx: TimelineEventContext,
463}
464
465impl<'a, 'o> TimelineEventHandler<'a, 'o> {
466 pub(super) fn new(
467 state: &'a mut TimelineStateTransaction<'o>,
468 ctx: TimelineEventContext,
469 ) -> Self {
470 let TimelineStateTransaction { items, meta, .. } = state;
471 Self { items, meta, ctx }
472 }
473
474 #[instrument(skip_all, fields(txn_id, event_id, position))]
481 pub(super) async fn handle_event(
482 mut self,
483 date_divider_adjuster: &mut DateDividerAdjuster,
484 timeline_action: TimelineAction,
485 ) -> RemovedItem {
486 let span = tracing::Span::current();
487
488 date_divider_adjuster.mark_used();
489
490 match &self.ctx.flow {
491 Flow::Local { txn_id, .. } => {
492 span.record("txn_id", debug(txn_id));
493 debug!("Handling local event");
494 }
495
496 Flow::Remote { event_id, txn_id, position, .. } => {
497 span.record("event_id", debug(event_id));
498 span.record("position", debug(position));
499 if let Some(txn_id) = txn_id {
500 span.record("txn_id", debug(txn_id));
501 }
502 trace!("Handling remote event");
503 }
504 };
505
506 let mut added_item = false;
507
508 match timeline_action {
509 TimelineAction::AddItem { content } => {
510 if self.ctx.should_add_new_items {
511 self.add_item(content);
512 added_item = true;
513 }
514 }
515
516 TimelineAction::HandleAggregation { related_event, kind } => match kind {
517 HandleAggregationKind::Reaction { key } => {
518 self.handle_reaction(related_event, key);
519 }
520 HandleAggregationKind::Redaction => {
521 self.handle_redaction(related_event);
522 }
523 HandleAggregationKind::Edit { replacement } => {
524 self.handle_edit(
525 replacement.event_id.clone(),
526 PendingEditKind::RoomMessage(replacement),
527 );
528 }
529 HandleAggregationKind::PollResponse { answers } => {
530 self.handle_poll_response(related_event, answers);
531 }
532 HandleAggregationKind::PollEdit { replacement } => {
533 self.handle_edit(
534 replacement.event_id.clone(),
535 PendingEditKind::Poll(replacement),
536 );
537 }
538 HandleAggregationKind::PollEnd => {
539 self.handle_poll_end(related_event);
540 }
541 },
542 }
543
544 let mut removed_item = false;
545
546 if !added_item {
547 trace!("No new item added");
548
549 if let Flow::Remote {
550 position: TimelineItemPosition::UpdateAt { timeline_item_index },
551 ..
552 } = self.ctx.flow
553 {
554 trace!("Removing UTD that was successfully retried");
557 self.items.remove(timeline_item_index);
558 removed_item = true;
559 }
560 }
561
562 removed_item
563 }
564
565 #[instrument(skip(self, edit_kind))]
566 fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
567 let target = TimelineEventItemId::EventId(edited_event_id.clone());
568
569 let encryption_info =
570 as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
571 let aggregation = Aggregation::new(
572 self.ctx.flow.timeline_item_id(),
573 AggregationKind::Edit(PendingEdit {
574 kind: edit_kind,
575 edit_json: self.ctx.flow.raw_event().cloned(),
576 encryption_info,
577 }),
578 );
579
580 self.meta.aggregations.add(target.clone(), aggregation.clone());
581
582 if let Some(new_item) = find_item_and_apply_aggregation(
583 &self.meta.aggregations,
584 self.items,
585 &target,
586 aggregation,
587 &self.meta.room_version,
588 ) {
589 Self::maybe_update_responses(self.meta, self.items, &edited_event_id, &new_item);
591 }
592 }
593
594 #[instrument(skip(self))]
599 fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
600 let target = TimelineEventItemId::EventId(relates_to);
601
602 let reaction_status = match &self.ctx.flow {
604 Flow::Local { send_handle, .. } => {
605 ReactionStatus::LocalToRemote(send_handle.clone())
607 }
608 Flow::Remote { event_id, .. } => {
609 ReactionStatus::RemoteToRemote(event_id.clone())
611 }
612 };
613
614 let aggregation = Aggregation::new(
615 self.ctx.flow.timeline_item_id(),
616 AggregationKind::Reaction {
617 key: reaction_key,
618 sender: self.ctx.sender.clone(),
619 timestamp: self.ctx.timestamp,
620 reaction_status,
621 },
622 );
623
624 self.meta.aggregations.add(target.clone(), aggregation.clone());
625 find_item_and_apply_aggregation(
626 &self.meta.aggregations,
627 self.items,
628 &target,
629 aggregation,
630 &self.meta.room_version,
631 );
632 }
633
634 fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
635 let target = TimelineEventItemId::EventId(poll_event_id);
636 let aggregation = Aggregation::new(
637 self.ctx.flow.timeline_item_id(),
638 AggregationKind::PollResponse {
639 sender: self.ctx.sender.clone(),
640 timestamp: self.ctx.timestamp,
641 answers,
642 },
643 );
644 self.meta.aggregations.add(target.clone(), aggregation.clone());
645 find_item_and_apply_aggregation(
646 &self.meta.aggregations,
647 self.items,
648 &target,
649 aggregation,
650 &self.meta.room_version,
651 );
652 }
653
654 fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
655 let target = TimelineEventItemId::EventId(poll_event_id);
656 let aggregation = Aggregation::new(
657 self.ctx.flow.timeline_item_id(),
658 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
659 );
660 self.meta.aggregations.add(target.clone(), aggregation.clone());
661 find_item_and_apply_aggregation(
662 &self.meta.aggregations,
663 self.items,
664 &target,
665 aggregation,
666 &self.meta.room_version,
667 );
668 }
669
670 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
676 fn handle_redaction(&mut self, redacted: OwnedEventId) {
677 if self.handle_aggregation_redaction(redacted.clone()) {
682 return;
685 }
686
687 let target = TimelineEventItemId::EventId(redacted.clone());
688 let aggregation =
689 Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
690 self.meta.aggregations.add(target.clone(), aggregation.clone());
691
692 if let Some(new_item) = find_item_and_apply_aggregation(
693 &self.meta.aggregations,
694 self.items,
695 &target,
696 aggregation,
697 &self.meta.room_version,
698 ) {
699 Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
702 }
703 }
704
705 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
710 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
711 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
712
713 match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
714 Ok(val) => val,
715 Err(err) => {
717 warn!("error while attempting to remove aggregation: {err}");
718 true
720 }
721 }
722 }
723
724 fn add_item(&mut self, content: TimelineItemContent) {
737 let sender = self.ctx.sender.to_owned();
738 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
739 let timestamp = self.ctx.timestamp;
740
741 let kind: EventTimelineItemKind = match &self.ctx.flow {
742 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
743 send_state: EventSendState::NotSentYet,
744 transaction_id: txn_id.to_owned(),
745 send_handle: send_handle.clone(),
746 }
747 .into(),
748
749 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
750 let origin = match *position {
751 TimelineItemPosition::Start { origin }
752 | TimelineItemPosition::End { origin }
753 | TimelineItemPosition::At { origin, .. } => origin,
754
755 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
757 .as_event()
758 .and_then(|ev| Some(ev.as_remote()?.origin))
759 .unwrap_or_else(|| {
760 error!("Tried to update a local event");
761 RemoteEventOrigin::Unknown
762 }),
763 };
764
765 RemoteEventTimelineItem {
766 event_id: event_id.clone(),
767 transaction_id: txn_id.clone(),
768 read_receipts: self.ctx.read_receipts.clone(),
769 is_own: self.ctx.sender == self.meta.own_user_id,
770 is_highlighted: self.ctx.is_highlighted,
771 encryption_info: encryption_info.clone(),
772 original_json: Some(raw_event.clone()),
773 latest_edit_json: None,
774 origin,
775 }
776 .into()
777 }
778 };
779
780 let is_room_encrypted = self.meta.is_room_encrypted;
781
782 let item = EventTimelineItem::new(
783 sender,
784 sender_profile,
785 timestamp,
786 content,
787 kind,
788 is_room_encrypted,
789 );
790
791 let mut cowed = Cow::Owned(item);
793 if let Err(err) = self.meta.aggregations.apply_all(
794 &self.ctx.flow.timeline_item_id(),
795 &mut cowed,
796 self.items,
797 &self.meta.room_version,
798 ) {
799 warn!("discarding aggregations: {err}");
800 }
801 let item = cowed.into_owned();
802
803 match &self.ctx.flow {
804 Flow::Local { .. } => {
805 trace!("Adding new local timeline item");
806
807 let item = self.meta.new_timeline_item(item);
808
809 self.items.push_local(item);
810 }
811
812 Flow::Remote {
813 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
814 } => {
815 let item = Self::recycle_local_or_create_item(
816 self.items,
817 self.meta,
818 item,
819 event_id,
820 txn_id.as_deref(),
821 );
822
823 trace!("Adding new remote timeline item at the start");
824
825 self.items.push_front(item, Some(0));
826 }
827
828 Flow::Remote {
829 position: TimelineItemPosition::At { event_index, .. },
830 event_id,
831 txn_id,
832 ..
833 } => {
834 let item = Self::recycle_local_or_create_item(
835 self.items,
836 self.meta,
837 item,
838 event_id,
839 txn_id.as_deref(),
840 );
841
842 let all_remote_events = self.items.all_remote_events();
843 let event_index = *event_index;
844
845 let timeline_item_index = all_remote_events
847 .range(0..=event_index)
848 .rev()
849 .find_map(|event_meta| event_meta.timeline_item_index)
850 .map(|timeline_item_index| timeline_item_index + 1);
852
853 let timeline_item_index = timeline_item_index.or_else(|| {
856 all_remote_events
857 .range(event_index + 1..)
858 .find_map(|event_meta| event_meta.timeline_item_index)
859 });
860
861 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
864 self.items
865 .iter_remotes_region()
866 .rev()
867 .find_map(|(timeline_item_index, timeline_item)| {
868 timeline_item.as_event().map(|_| timeline_item_index + 1)
869 })
870 .unwrap_or_else(|| {
871 self.items.first_remotes_region_index()
874 })
875 });
876
877 trace!(
878 ?event_index,
879 ?timeline_item_index,
880 "Adding new remote timeline at specific event index"
881 );
882
883 self.items.insert(timeline_item_index, item, Some(event_index));
884 }
885
886 Flow::Remote {
887 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
888 } => {
889 let item = Self::recycle_local_or_create_item(
890 self.items,
891 self.meta,
892 item,
893 event_id,
894 txn_id.as_deref(),
895 );
896
897 let timeline_item_index = self
899 .items
900 .iter_remotes_region()
901 .rev()
902 .find_map(|(timeline_item_index, timeline_item)| {
903 timeline_item.as_event().map(|_| timeline_item_index + 1)
904 })
905 .unwrap_or_else(|| {
906 self.items.first_remotes_region_index()
909 });
910
911 let event_index = self
912 .items
913 .all_remote_events()
914 .last_index()
915 .or_else(|| {
919 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
920
921 Some(0)
922 });
923
924 if timeline_item_index == self.items.len() {
933 trace!("Adding new remote timeline item at the back");
934 self.items.push_back(item, event_index);
935 } else if timeline_item_index == 0 {
936 trace!("Adding new remote timeline item at the front");
937 self.items.push_front(item, event_index);
938 } else {
939 trace!(
940 timeline_item_index,
941 "Adding new remote timeline item at specific index"
942 );
943 self.items.insert(timeline_item_index, item, event_index);
944 }
945 }
946
947 Flow::Remote {
948 event_id: decrypted_event_id,
949 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
950 ..
951 } => {
952 trace!("Updating timeline item at position {idx}");
953
954 Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
956
957 let internal_id = self.items[*idx].internal_id.clone();
958 self.items.replace(*idx, TimelineItem::new(item, internal_id));
959 }
960 }
961
962 if !self.meta.has_up_to_date_read_marker_item {
964 self.meta.update_read_marker(self.items);
965 }
966 }
967
968 fn recycle_local_or_create_item(
974 items: &mut ObservableItemsTransaction<'_>,
975 meta: &mut TimelineMetadata,
976 mut new_item: EventTimelineItem,
977 event_id: &EventId,
978 transaction_id: Option<&TransactionId>,
979 ) -> Arc<TimelineItem> {
980 if let Some((local_timeline_item_index, local_timeline_item)) = items
982 .iter_locals_region()
984 .rev()
986 .find_map(|(nth, timeline_item)| {
987 let event_timeline_item = timeline_item.as_event()?;
988
989 if Some(event_id) == event_timeline_item.event_id()
990 || (transaction_id.is_some()
991 && transaction_id == event_timeline_item.transaction_id())
992 {
993 Some((nth, event_timeline_item))
995 } else {
996 None
999 }
1000 })
1001 {
1002 trace!(
1003 ?event_id,
1004 ?transaction_id,
1005 ?local_timeline_item_index,
1006 "Removing local timeline item"
1007 );
1008
1009 transfer_details(&mut new_item, local_timeline_item);
1010
1011 let recycled = items.remove(local_timeline_item_index);
1013 TimelineItem::new(new_item, recycled.internal_id.clone())
1014 } else {
1015 meta.new_timeline_item(new_item)
1017 }
1018 }
1019
1020 fn maybe_update_responses(
1023 meta: &mut TimelineMetadata,
1024 items: &mut ObservableItemsTransaction<'_>,
1025 target_event_id: &EventId,
1026 new_item: &EventTimelineItem,
1027 ) {
1028 let Some(replies) = meta.replies.get(target_event_id) else {
1029 trace!("item has no replies");
1030 return;
1031 };
1032
1033 for reply_id in replies {
1034 let Some(timeline_item_index) = items
1035 .get_remote_event_by_event_id(reply_id)
1036 .and_then(|meta| meta.timeline_item_index)
1037 else {
1038 warn!(%reply_id, "event not known as an item in the timeline");
1039 continue;
1040 };
1041
1042 let Some(item) = items.get(timeline_item_index) else {
1043 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1044 continue;
1045 };
1046
1047 let Some(event_item) = item.as_event() else { continue };
1048 let Some(msglike) = event_item.content.as_msglike() else { continue };
1049 let Some(message) = msglike.as_message() else { continue };
1050 let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1051
1052 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1053 let in_reply_to = InReplyToDetails {
1054 event_id: in_reply_to.event_id.clone(),
1055 event: TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(
1056 new_item,
1057 ))),
1058 };
1059
1060 let new_reply_content = TimelineItemContent::MsgLike(
1061 msglike
1062 .with_in_reply_to(in_reply_to)
1063 .with_kind(MsgLikeKind::Message(message.clone())),
1064 );
1065 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1066 items.replace(timeline_item_index, new_reply_item);
1067 }
1068 }
1069}
1070
1071fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1079 let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1080 return;
1081 };
1082 let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1083 return;
1084 };
1085
1086 let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1087 let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1088
1089 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1090 in_reply_to.event = old_in_reply_to.event.clone();
1091 }
1092}