1use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 crypto::types::events::UtdCause,
21 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22 send_queue::SendHandle,
23};
24use ruma::{
25 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26 TransactionId,
27 events::{
28 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29 AnySyncTimelineEvent, FullStateEventContent, MessageLikeEventContent, MessageLikeEventType,
30 StateEventType, SyncStateEvent,
31 poll::unstable_start::{
32 NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
33 },
34 receipt::Receipt,
35 relation::Replacement,
36 room::message::{
37 Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
38 },
39 },
40 serde::Raw,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45 EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
46 MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
47 TimelineItemContent,
48 controller::{
49 Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
50 TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
51 },
52 date_dividers::DateDividerAdjuster,
53 event_item::{
54 AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
55 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
56 TimelineEventItemId,
57 },
58 traits::RoomDataProvider,
59};
60use crate::{
61 timeline::controller::aggregations::PendingEdit, unable_to_decrypt_hook::UtdHookManager,
62};
63
64pub(super) enum Flow {
66 Local {
68 txn_id: OwnedTransactionId,
70
71 send_handle: Option<SendHandle>,
73 },
74
75 Remote {
78 event_id: OwnedEventId,
80 txn_id: Option<OwnedTransactionId>,
83 raw_event: Raw<AnySyncTimelineEvent>,
85 position: TimelineItemPosition,
87 encryption_info: Option<Arc<EncryptionInfo>>,
89 },
90}
91
92impl Flow {
93 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
95 match self {
96 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
97 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
98 }
99 }
100
101 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
103 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
104 }
105}
106
107pub(super) struct TimelineEventContext {
108 pub(super) sender: OwnedUserId,
109 pub(super) sender_profile: Option<Profile>,
110 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
112 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
113 pub(super) is_highlighted: bool,
114 pub(super) flow: Flow,
115
116 pub(super) should_add_new_items: bool,
122}
123
124#[derive(Clone, Debug)]
127pub(super) enum HandleAggregationKind {
128 Reaction { key: String },
130
131 Redaction,
133
134 Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
136
137 PollResponse { answers: Vec<String> },
139
140 PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
142
143 PollEnd,
145}
146
147#[derive(Clone, Debug)]
149#[allow(clippy::large_enum_variant)]
150pub(super) enum TimelineAction {
151 AddItem {
157 content: TimelineItemContent,
159 },
160
161 HandleAggregation {
167 related_event: OwnedEventId,
169 kind: HandleAggregationKind,
171 },
172}
173
174impl TimelineAction {
175 fn add_item(content: TimelineItemContent) -> Self {
177 Self::AddItem { content }
178 }
179
180 #[allow(clippy::too_many_arguments)]
185 pub async fn from_event<P: RoomDataProvider>(
186 event: AnySyncTimelineEvent,
187 raw_event: &Raw<AnySyncTimelineEvent>,
188 room_data_provider: &P,
189 unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
190 in_reply_to: Option<InReplyToDetails>,
191 thread_root: Option<OwnedEventId>,
192 thread_summary: Option<ThreadSummary>,
193 ) -> Option<Self> {
194 let redaction_rules = room_data_provider.room_version_rules().redaction;
195
196 let redacted_message_or_none = |event_type: MessageLikeEventType| {
197 (event_type != MessageLikeEventType::Reaction)
198 .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
199 };
200
201 Some(match event {
202 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
203 if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
204 Self::HandleAggregation {
205 related_event: redacts,
206 kind: HandleAggregationKind::Redaction,
207 }
208 } else {
209 Self::add_item(redacted_message_or_none(ev.event_type())?)
210 }
211 }
212
213 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
214 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
215 if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
217 unable_to_decrypt
218 {
219 let utd_cause = UtdCause::determine(
220 raw_event,
221 room_data_provider.crypto_context_info().await,
222 &unable_to_decrypt_info,
223 );
224
225 if let Some(hook) = unable_to_decrypt_hook_manager {
228 hook.on_utd(
229 ev.event_id(),
230 utd_cause,
231 ev.origin_server_ts(),
232 ev.sender(),
233 )
234 .await;
235 }
236
237 Self::add_item(TimelineItemContent::MsgLike(
238 MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
239 content, utd_cause,
240 )),
241 ))
242 } else {
243 return Self::from_content(
248 AnyMessageLikeEventContent::RoomEncrypted(content),
249 in_reply_to,
250 thread_root,
251 thread_summary,
252 );
253 }
254 }
255
256 Some(content) => {
257 return Self::from_content(content, in_reply_to, thread_root, thread_summary);
258 }
259
260 None => Self::add_item(redacted_message_or_none(ev.event_type())?),
261 },
262
263 AnySyncTimelineEvent::State(ev) => match ev {
264 AnySyncStateEvent::RoomMember(ev) => match ev {
265 SyncStateEvent::Original(ev) => {
266 Self::add_item(TimelineItemContent::room_member(
267 ev.state_key,
268 FullStateEventContent::Original {
269 content: ev.content,
270 prev_content: ev.unsigned.prev_content,
271 },
272 ev.sender,
273 ))
274 }
275 SyncStateEvent::Redacted(ev) => {
276 Self::add_item(TimelineItemContent::room_member(
277 ev.state_key,
278 FullStateEventContent::Redacted(ev.content),
279 ev.sender,
280 ))
281 }
282 },
283 ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
284 state_key: ev.state_key().to_owned(),
285 content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
286 })),
287 },
288 })
289 }
290
291 pub(super) fn from_content(
300 content: AnyMessageLikeEventContent,
301 in_reply_to: Option<InReplyToDetails>,
302 thread_root: Option<OwnedEventId>,
303 thread_summary: Option<ThreadSummary>,
304 ) -> Option<Self> {
305 Some(match content {
306 AnyMessageLikeEventContent::Reaction(c) => {
307 Self::HandleAggregation {
309 related_event: c.relates_to.event_id.clone(),
310 kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
311 }
312 }
313
314 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
315 relates_to: Some(Relation::Replacement(re)),
316 ..
317 }) => Self::HandleAggregation {
318 related_event: re.event_id.clone(),
319 kind: HandleAggregationKind::Edit { replacement: re },
320 },
321
322 AnyMessageLikeEventContent::UnstablePollStart(
323 UnstablePollStartEventContent::Replacement(re),
324 ) => Self::HandleAggregation {
325 related_event: re.relates_to.event_id.clone(),
326 kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
327 },
328
329 AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
330 related_event: c.relates_to.event_id,
331 kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
332 },
333
334 AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
335 related_event: c.relates_to.event_id,
336 kind: HandleAggregationKind::PollEnd,
337 },
338
339 AnyMessageLikeEventContent::CallInvite(_) => {
340 Self::add_item(TimelineItemContent::CallInvite)
341 }
342
343 AnyMessageLikeEventContent::CallNotify(_) => {
344 Self::add_item(TimelineItemContent::CallNotify)
345 }
346
347 AnyMessageLikeEventContent::Sticker(content) => {
348 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
349 kind: MsgLikeKind::Sticker(Sticker { content }),
350 reactions: Default::default(),
351 thread_root,
352 in_reply_to,
353 thread_summary,
354 }))
355 }
356
357 AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
358 c,
359 )) => {
360 let poll_state = PollState::new(c);
361
362 Self::AddItem {
363 content: TimelineItemContent::MsgLike(MsgLikeContent {
364 kind: MsgLikeKind::Poll(poll_state),
365 reactions: Default::default(),
366 thread_root,
367 in_reply_to,
368 thread_summary,
369 }),
370 }
371 }
372
373 AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
374 content: TimelineItemContent::message(
375 msg.msgtype,
376 msg.mentions,
377 Default::default(),
378 thread_root,
379 in_reply_to,
380 thread_summary,
381 ),
382 },
383
384 _ => {
385 debug!(
386 "Ignoring message-like event of type `{}`, not supported (yet)",
387 content.event_type()
388 );
389 return None;
390 }
391 })
392 }
393
394 pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
395 let error = Arc::new(error);
396 match event {
397 FailedToParseEvent::State { event_type, state_key } => {
398 Self::add_item(TimelineItemContent::FailedToParseState {
399 event_type,
400 state_key,
401 error,
402 })
403 }
404 FailedToParseEvent::MsgLike(event_type) => {
405 Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
406 }
407 }
408 }
409}
410
411#[derive(Debug)]
412pub(super) enum FailedToParseEvent {
413 MsgLike(MessageLikeEventType),
414 State { event_type: StateEventType, state_key: String },
415}
416
417#[derive(Clone, Copy, Debug)]
419pub(super) enum TimelineItemPosition {
420 Start {
423 origin: RemoteEventOrigin,
425 },
426
427 End {
430 origin: RemoteEventOrigin,
432 },
433
434 At {
436 event_index: usize,
438
439 origin: RemoteEventOrigin,
441 },
442
443 UpdateAt {
448 timeline_item_index: usize,
450 },
451}
452
453pub(super) type RemovedItem = bool;
455
456pub(super) struct TimelineEventHandler<'a, 'o> {
463 items: &'a mut ObservableItemsTransaction<'o>,
464 meta: &'a mut TimelineMetadata,
465 ctx: TimelineEventContext,
466}
467
468impl<'a, 'o> TimelineEventHandler<'a, 'o> {
469 pub(super) fn new<P: RoomDataProvider>(
470 state: &'a mut TimelineStateTransaction<'o, P>,
471 ctx: TimelineEventContext,
472 ) -> Self {
473 let TimelineStateTransaction { items, meta, .. } = state;
474 Self { items, meta, ctx }
475 }
476
477 #[instrument(skip_all, fields(txn_id, event_id, position))]
491 pub(super) async fn handle_event(
492 mut self,
493 date_divider_adjuster: &mut DateDividerAdjuster,
494 timeline_action: TimelineAction,
495 ) -> bool {
496 let span = tracing::Span::current();
497
498 date_divider_adjuster.mark_used();
499
500 match &self.ctx.flow {
501 Flow::Local { txn_id, .. } => {
502 span.record("txn_id", debug(txn_id));
503 debug!("Handling local event");
504 }
505
506 Flow::Remote { event_id, txn_id, position, .. } => {
507 span.record("event_id", debug(event_id));
508 span.record("position", debug(position));
509 if let Some(txn_id) = txn_id {
510 span.record("txn_id", debug(txn_id));
511 }
512 trace!("Handling remote event");
513 }
514 }
515
516 let mut added_item = false;
517
518 match timeline_action {
519 TimelineAction::AddItem { content } => {
520 if self.ctx.should_add_new_items {
521 self.add_item(content);
522 added_item = true;
523 }
524 }
525
526 TimelineAction::HandleAggregation { related_event, kind } => match kind {
527 HandleAggregationKind::Reaction { key } => {
528 self.handle_reaction(related_event, key);
529 }
530 HandleAggregationKind::Redaction => {
531 self.handle_redaction(related_event);
532 }
533 HandleAggregationKind::Edit { replacement } => {
534 self.handle_edit(
535 replacement.event_id.clone(),
536 PendingEditKind::RoomMessage(replacement),
537 );
538 }
539 HandleAggregationKind::PollResponse { answers } => {
540 self.handle_poll_response(related_event, answers);
541 }
542 HandleAggregationKind::PollEdit { replacement } => {
543 self.handle_edit(
544 replacement.event_id.clone(),
545 PendingEditKind::Poll(replacement),
546 );
547 }
548 HandleAggregationKind::PollEnd => {
549 self.handle_poll_end(related_event);
550 }
551 },
552 }
553
554 added_item
555 }
556
557 #[instrument(skip(self, edit_kind))]
558 fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
559 let target = TimelineEventItemId::EventId(edited_event_id.clone());
560
561 let encryption_info =
562 as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
563 let aggregation = Aggregation::new(
564 self.ctx.flow.timeline_item_id(),
565 AggregationKind::Edit(PendingEdit {
566 kind: edit_kind,
567 edit_json: self.ctx.flow.raw_event().cloned(),
568 encryption_info,
569 bundled_item_owner: None,
570 }),
571 );
572
573 self.meta.aggregations.add(target.clone(), aggregation.clone());
574
575 if let Some(new_item) = find_item_and_apply_aggregation(
576 &self.meta.aggregations,
577 self.items,
578 &target,
579 aggregation,
580 &self.meta.room_version_rules,
581 ) {
582 Self::maybe_update_responses(self.meta, self.items, &edited_event_id, &new_item);
584 }
585 }
586
587 #[instrument(skip(self))]
592 fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
593 let target = TimelineEventItemId::EventId(relates_to);
594
595 let reaction_status = match &self.ctx.flow {
597 Flow::Local { send_handle, .. } => {
598 ReactionStatus::LocalToRemote(send_handle.clone())
600 }
601 Flow::Remote { event_id, .. } => {
602 ReactionStatus::RemoteToRemote(event_id.clone())
604 }
605 };
606
607 let aggregation = Aggregation::new(
608 self.ctx.flow.timeline_item_id(),
609 AggregationKind::Reaction {
610 key: reaction_key,
611 sender: self.ctx.sender.clone(),
612 timestamp: self.ctx.timestamp,
613 reaction_status,
614 },
615 );
616
617 self.meta.aggregations.add(target.clone(), aggregation.clone());
618 find_item_and_apply_aggregation(
619 &self.meta.aggregations,
620 self.items,
621 &target,
622 aggregation,
623 &self.meta.room_version_rules,
624 );
625 }
626
627 fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
628 let target = TimelineEventItemId::EventId(poll_event_id);
629 let aggregation = Aggregation::new(
630 self.ctx.flow.timeline_item_id(),
631 AggregationKind::PollResponse {
632 sender: self.ctx.sender.clone(),
633 timestamp: self.ctx.timestamp,
634 answers,
635 },
636 );
637 self.meta.aggregations.add(target.clone(), aggregation.clone());
638 find_item_and_apply_aggregation(
639 &self.meta.aggregations,
640 self.items,
641 &target,
642 aggregation,
643 &self.meta.room_version_rules,
644 );
645 }
646
647 fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
648 let target = TimelineEventItemId::EventId(poll_event_id);
649 let aggregation = Aggregation::new(
650 self.ctx.flow.timeline_item_id(),
651 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
652 );
653 self.meta.aggregations.add(target.clone(), aggregation.clone());
654 find_item_and_apply_aggregation(
655 &self.meta.aggregations,
656 self.items,
657 &target,
658 aggregation,
659 &self.meta.room_version_rules,
660 );
661 }
662
663 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
669 fn handle_redaction(&mut self, redacted: OwnedEventId) {
670 if self.handle_aggregation_redaction(redacted.clone()) {
675 return;
678 }
679
680 let target = TimelineEventItemId::EventId(redacted.clone());
681 let aggregation =
682 Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
683 self.meta.aggregations.add(target.clone(), aggregation.clone());
684
685 if let Some(new_item) = find_item_and_apply_aggregation(
686 &self.meta.aggregations,
687 self.items,
688 &target,
689 aggregation,
690 &self.meta.room_version_rules,
691 ) {
692 Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
695 }
696 }
697
698 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
703 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
704 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
705
706 match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
707 Ok(val) => val,
708 Err(err) => {
710 warn!("error while attempting to remove aggregation: {err}");
711 true
713 }
714 }
715 }
716
717 fn add_item(&mut self, content: TimelineItemContent) {
730 let sender = self.ctx.sender.to_owned();
731 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
732 let timestamp = self.ctx.timestamp;
733
734 let kind: EventTimelineItemKind = match &self.ctx.flow {
735 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
736 send_state: EventSendState::NotSentYet { progress: None },
737 transaction_id: txn_id.to_owned(),
738 send_handle: send_handle.clone(),
739 }
740 .into(),
741
742 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
743 let origin = match *position {
744 TimelineItemPosition::Start { origin }
745 | TimelineItemPosition::End { origin }
746 | TimelineItemPosition::At { origin, .. } => origin,
747
748 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
750 .as_event()
751 .and_then(|ev| Some(ev.as_remote()?.origin))
752 .unwrap_or_else(|| {
753 error!("Tried to update a local event");
754 RemoteEventOrigin::Unknown
755 }),
756 };
757
758 RemoteEventTimelineItem {
759 event_id: event_id.clone(),
760 transaction_id: txn_id.clone(),
761 read_receipts: self.ctx.read_receipts.clone(),
762 is_own: self.ctx.sender == self.meta.own_user_id,
763 is_highlighted: self.ctx.is_highlighted,
764 encryption_info: encryption_info.clone(),
765 original_json: Some(raw_event.clone()),
766 latest_edit_json: None,
767 origin,
768 }
769 .into()
770 }
771 };
772
773 let is_room_encrypted = self.meta.is_room_encrypted;
774
775 let item = EventTimelineItem::new(
776 sender,
777 sender_profile,
778 timestamp,
779 content,
780 kind,
781 is_room_encrypted,
782 );
783
784 let mut cowed = Cow::Owned(item);
786 if let Err(err) = self.meta.aggregations.apply_all(
787 &self.ctx.flow.timeline_item_id(),
788 &mut cowed,
789 self.items,
790 &self.meta.room_version_rules,
791 ) {
792 warn!("discarding aggregations: {err}");
793 }
794 let item = cowed.into_owned();
795
796 match &self.ctx.flow {
797 Flow::Local { .. } => {
798 trace!("Adding new local timeline item");
799
800 let item = self.meta.new_timeline_item(item);
801
802 self.items.push_local(item);
803 }
804
805 Flow::Remote {
806 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
807 } => {
808 let item = Self::recycle_local_or_create_item(
809 self.items,
810 self.meta,
811 item,
812 event_id,
813 txn_id.as_deref(),
814 );
815
816 trace!("Adding new remote timeline item at the start");
817
818 self.items.push_front(item, Some(0));
819 }
820
821 Flow::Remote {
822 position: TimelineItemPosition::At { event_index, .. },
823 event_id,
824 txn_id,
825 ..
826 } => {
827 let item = Self::recycle_local_or_create_item(
828 self.items,
829 self.meta,
830 item,
831 event_id,
832 txn_id.as_deref(),
833 );
834
835 let all_remote_events = self.items.all_remote_events();
836 let event_index = *event_index;
837
838 let timeline_item_index = all_remote_events
840 .range(0..=event_index)
841 .rev()
842 .find_map(|event_meta| event_meta.timeline_item_index)
843 .map(|timeline_item_index| timeline_item_index + 1);
845
846 let timeline_item_index = timeline_item_index.or_else(|| {
849 all_remote_events
850 .range(event_index + 1..)
851 .find_map(|event_meta| event_meta.timeline_item_index)
852 });
853
854 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
857 self.items
858 .iter_remotes_region()
859 .rev()
860 .find_map(|(timeline_item_index, timeline_item)| {
861 timeline_item.as_event().map(|_| timeline_item_index + 1)
862 })
863 .unwrap_or_else(|| {
864 self.items.first_remotes_region_index()
867 })
868 });
869
870 trace!(
871 ?event_index,
872 ?timeline_item_index,
873 "Adding new remote timeline at specific event index"
874 );
875
876 self.items.insert(timeline_item_index, item, Some(event_index));
877 }
878
879 Flow::Remote {
880 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
881 } => {
882 let item = Self::recycle_local_or_create_item(
883 self.items,
884 self.meta,
885 item,
886 event_id,
887 txn_id.as_deref(),
888 );
889
890 let timeline_item_index = self
892 .items
893 .iter_remotes_region()
894 .rev()
895 .find_map(|(timeline_item_index, timeline_item)| {
896 timeline_item.as_event().map(|_| timeline_item_index + 1)
897 })
898 .unwrap_or_else(|| {
899 self.items.first_remotes_region_index()
902 });
903
904 let event_index = self
905 .items
906 .all_remote_events()
907 .last_index()
908 .or_else(|| {
912 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
913
914 Some(0)
915 });
916
917 if timeline_item_index == self.items.len() {
926 trace!("Adding new remote timeline item at the back");
927 self.items.push_back(item, event_index);
928 } else if timeline_item_index == 0 {
929 trace!("Adding new remote timeline item at the front");
930 self.items.push_front(item, event_index);
931 } else {
932 trace!(
933 timeline_item_index,
934 "Adding new remote timeline item at specific index"
935 );
936 self.items.insert(timeline_item_index, item, event_index);
937 }
938 }
939
940 Flow::Remote {
941 event_id: decrypted_event_id,
942 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
943 ..
944 } => {
945 trace!("Updating timeline item at position {idx}");
946
947 Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
949
950 let internal_id = self.items[*idx].internal_id.clone();
951 self.items.replace(*idx, TimelineItem::new(item, internal_id));
952 }
953 }
954
955 if !self.meta.has_up_to_date_read_marker_item {
957 self.meta.update_read_marker(self.items);
958 }
959 }
960
961 fn recycle_local_or_create_item(
967 items: &mut ObservableItemsTransaction<'_>,
968 meta: &mut TimelineMetadata,
969 mut new_item: EventTimelineItem,
970 event_id: &EventId,
971 transaction_id: Option<&TransactionId>,
972 ) -> Arc<TimelineItem> {
973 if let Some((local_timeline_item_index, local_timeline_item)) = items
975 .iter_locals_region()
977 .rev()
979 .find_map(|(nth, timeline_item)| {
980 let event_timeline_item = timeline_item.as_event()?;
981
982 if Some(event_id) == event_timeline_item.event_id()
983 || (transaction_id.is_some()
984 && transaction_id == event_timeline_item.transaction_id())
985 {
986 Some((nth, event_timeline_item))
988 } else {
989 None
992 }
993 })
994 {
995 trace!(
996 ?event_id,
997 ?transaction_id,
998 ?local_timeline_item_index,
999 "Removing local timeline item"
1000 );
1001
1002 transfer_details(&mut new_item, local_timeline_item);
1003
1004 let recycled = items.remove(local_timeline_item_index);
1006 TimelineItem::new(new_item, recycled.internal_id.clone())
1007 } else {
1008 meta.new_timeline_item(new_item)
1010 }
1011 }
1012
1013 fn maybe_update_responses(
1016 meta: &mut TimelineMetadata,
1017 items: &mut ObservableItemsTransaction<'_>,
1018 target_event_id: &EventId,
1019 new_item: &EventTimelineItem,
1020 ) {
1021 let Some(replies) = meta.replies.get(target_event_id) else {
1022 trace!("item has no replies");
1023 return;
1024 };
1025
1026 for reply_id in replies {
1027 let Some(timeline_item_index) = items
1028 .get_remote_event_by_event_id(reply_id)
1029 .and_then(|meta| meta.timeline_item_index)
1030 else {
1031 warn!(%reply_id, "event not known as an item in the timeline");
1032 continue;
1033 };
1034
1035 let Some(item) = items.get(timeline_item_index) else {
1036 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1037 continue;
1038 };
1039
1040 let Some(event_item) = item.as_event() else { continue };
1041 let Some(msglike) = event_item.content.as_msglike() else { continue };
1042 let Some(message) = msglike.as_message() else { continue };
1043 let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1044
1045 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1046 let in_reply_to = InReplyToDetails {
1047 event_id: in_reply_to.event_id.clone(),
1048 event: TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(
1049 new_item,
1050 ))),
1051 };
1052
1053 let new_reply_content = TimelineItemContent::MsgLike(
1054 msglike
1055 .with_in_reply_to(in_reply_to)
1056 .with_kind(MsgLikeKind::Message(message.clone())),
1057 );
1058 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1059 items.replace(timeline_item_index, new_reply_item);
1060 }
1061 }
1062}
1063
1064fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1072 let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1073 return;
1074 };
1075 let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1076 return;
1077 };
1078
1079 let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1080 let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1081
1082 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1083 in_reply_to.event = old_in_reply_to.event.clone();
1084 }
1085}