1use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
21 send_queue::SendHandle,
22};
23use matrix_sdk_base::crypto::types::events::UtdCause;
24use ruma::{
25 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26 TransactionId,
27 events::{
28 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29 AnySyncTimelineEvent, MessageLikeEventContent, MessageLikeEventType,
30 StateEventContentChange, StateEventType, SyncStateEvent,
31 beacon_info::BeaconInfoEventContent,
32 poll::unstable_start::{
33 NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
34 },
35 receipt::Receipt,
36 relation::Replacement,
37 room::message::{
38 Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
39 },
40 },
41 serde::Raw,
42};
43use tracing::{debug, error, field::debug, instrument, trace, warn};
44
45use super::{
46 BeaconInfo, EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails,
47 LiveLocationState, MsgLikeContent, MsgLikeKind, OtherState, ReactionStatus, Sticker,
48 ThreadSummary, TimelineDetails, TimelineItem, TimelineItemContent,
49 controller::{
50 Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
51 TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
52 },
53 date_dividers::DateDividerAdjuster,
54 event_item::{
55 AnyOtherStateEventContentChange, EventSendState, EventTimelineItemKind,
56 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
57 TimelineEventItemId,
58 },
59 traits::RoomDataProvider,
60};
61use crate::{
62 timeline::{
63 TimelineUniqueId, controller::aggregations::PendingEdit, event_item::OtherMessageLike,
64 },
65 unable_to_decrypt_hook::UtdHookManager,
66};
67
68pub(super) enum Flow {
70 Local {
72 txn_id: OwnedTransactionId,
74
75 send_handle: Option<SendHandle>,
77 },
78
79 Remote {
82 event_id: OwnedEventId,
84 txn_id: Option<OwnedTransactionId>,
87 raw_event: Raw<AnySyncTimelineEvent>,
89 position: TimelineItemPosition,
91 encryption_info: Option<Arc<EncryptionInfo>>,
93 },
94}
95
96impl Flow {
97 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
99 match self {
100 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
101 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
102 }
103 }
104
105 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
107 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
108 }
109}
110
111pub(super) struct TimelineEventContext {
112 pub(super) sender: OwnedUserId,
113 pub(super) sender_profile: Option<Profile>,
114 pub(super) forwarder: Option<OwnedUserId>,
119 pub(super) forwarder_profile: Option<Profile>,
124 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
126 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
127 pub(super) is_highlighted: bool,
128 pub(super) flow: Flow,
129
130 pub(super) should_add_new_items: bool,
136}
137
138#[derive(Clone, Debug)]
141pub(super) enum HandleAggregationKind {
142 Reaction { key: String },
144
145 Redaction,
147
148 Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
150
151 PollResponse { answers: Vec<String> },
153
154 PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
156
157 PollEnd,
159
160 BeaconUpdate { location: BeaconInfo },
162
163 BeaconStop { content: BeaconInfoEventContent },
169
170 CallDeclined,
172}
173
174impl HandleAggregationKind {
175 pub fn debug_string(&self) -> &'static str {
177 match self {
178 HandleAggregationKind::Reaction { .. } => "a reaction",
179 HandleAggregationKind::Redaction => "a redaction",
180 HandleAggregationKind::Edit { .. } => "an edit",
181 HandleAggregationKind::PollResponse { .. } => "a poll response",
182 HandleAggregationKind::PollEdit { .. } => "a poll edit",
183 HandleAggregationKind::PollEnd => "a poll end",
184 HandleAggregationKind::BeaconUpdate { .. } => "a beacon location update",
185 HandleAggregationKind::BeaconStop { .. } => "a beacon stop",
186 HandleAggregationKind::CallDeclined => "a call decline",
187 }
188 }
189}
190
191#[derive(Clone, Debug)]
193#[allow(clippy::large_enum_variant)]
194pub(super) enum TimelineAction {
195 AddItem {
201 content: TimelineItemContent,
203 },
204
205 HandleAggregation {
211 related_event: OwnedEventId,
213 kind: HandleAggregationKind,
215 },
216}
217
218impl TimelineAction {
219 fn add_item(content: TimelineItemContent) -> Self {
221 Self::AddItem { content }
222 }
223
224 #[allow(clippy::too_many_arguments)]
228 pub async fn from_event<P: RoomDataProvider>(
229 event: AnySyncTimelineEvent,
230 raw_event: &Raw<AnySyncTimelineEvent>,
231 room_data_provider: &P,
232 unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
233 in_reply_to: Option<InReplyToDetails>,
234 thread_root: Option<OwnedEventId>,
235 thread_summary: Option<ThreadSummary>,
236 ) -> Option<Self> {
237 let redaction_rules = room_data_provider.room_version_rules().redaction;
238
239 let redacted_message_or_none = |event_type: MessageLikeEventType| {
240 (event_type != MessageLikeEventType::Reaction)
241 .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
242 };
243
244 Some(match event {
245 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
246 if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
247 Self::HandleAggregation {
248 related_event: redacts,
249 kind: HandleAggregationKind::Redaction,
250 }
251 } else {
252 Self::add_item(redacted_message_or_none(ev.event_type())?)
253 }
254 }
255
256 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
257 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
258 if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
260 unable_to_decrypt
261 {
262 let utd_cause = UtdCause::determine(
263 raw_event,
264 room_data_provider.crypto_context_info().await,
265 &unable_to_decrypt_info,
266 );
267
268 if let Some(hook) = unable_to_decrypt_hook_manager {
271 hook.on_utd(
272 ev.event_id(),
273 utd_cause,
274 ev.origin_server_ts(),
275 ev.sender(),
276 )
277 .await;
278 }
279
280 Self::add_item(TimelineItemContent::MsgLike(
281 MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
282 content, utd_cause,
283 )),
284 ))
285 } else {
286 Self::from_content(
291 AnyMessageLikeEventContent::RoomEncrypted(content),
292 in_reply_to,
293 thread_root,
294 thread_summary,
295 )
296 }
297 }
298
299 Some(content) => {
300 Self::from_content(content, in_reply_to, thread_root, thread_summary)
301 }
302
303 None => Self::add_item(redacted_message_or_none(ev.event_type())?),
304 },
305
306 AnySyncTimelineEvent::State(ev) => match ev {
307 AnySyncStateEvent::RoomMember(ev) => match ev {
308 SyncStateEvent::Original(ev) => {
309 Self::add_item(TimelineItemContent::room_member(
310 ev.state_key,
311 StateEventContentChange::Original {
312 content: ev.content,
313 prev_content: ev.unsigned.prev_content,
314 },
315 ev.sender,
316 ))
317 }
318 SyncStateEvent::Redacted(ev) => {
319 Self::add_item(TimelineItemContent::room_member(
320 ev.state_key,
321 StateEventContentChange::Redacted(ev.content),
322 ev.sender,
323 ))
324 }
325 },
326 AnySyncStateEvent::BeaconInfo(ev) => match ev {
327 SyncStateEvent::Original(ev) => {
328 if ev.content.live {
333 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
334 kind: MsgLikeKind::LiveLocation(LiveLocationState::new(ev.content)),
335 reactions: Default::default(),
336 thread_root: None,
337 in_reply_to: None,
338 thread_summary: None,
339 }))
340 } else {
341 Self::HandleAggregation {
345 related_event: ev.event_id,
348 kind: HandleAggregationKind::BeaconStop { content: ev.content },
349 }
350 }
351 }
352 SyncStateEvent::Redacted(_) => {
353 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
354 }
355 },
356 ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
357 state_key: ev.state_key().to_owned(),
358 content: AnyOtherStateEventContentChange::with_event_content(
359 ev.content_change(),
360 ),
361 })),
362 },
363 })
364 }
365
366 pub(super) fn from_content(
375 content: AnyMessageLikeEventContent,
376 in_reply_to: Option<InReplyToDetails>,
377 thread_root: Option<OwnedEventId>,
378 thread_summary: Option<ThreadSummary>,
379 ) -> Self {
380 match content {
381 AnyMessageLikeEventContent::Reaction(c) => {
382 Self::HandleAggregation {
384 related_event: c.relates_to.event_id.clone(),
385 kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
386 }
387 }
388
389 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
390 relates_to: Some(Relation::Replacement(re)),
391 ..
392 }) => Self::HandleAggregation {
393 related_event: re.event_id.clone(),
394 kind: HandleAggregationKind::Edit { replacement: re },
395 },
396
397 AnyMessageLikeEventContent::UnstablePollStart(
398 UnstablePollStartEventContent::Replacement(re),
399 ) => Self::HandleAggregation {
400 related_event: re.relates_to.event_id.clone(),
401 kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
402 },
403
404 AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
405 related_event: c.relates_to.event_id,
406 kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
407 },
408
409 AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
410 related_event: c.relates_to.event_id,
411 kind: HandleAggregationKind::PollEnd,
412 },
413
414 AnyMessageLikeEventContent::CallInvite(_) => {
415 Self::add_item(TimelineItemContent::CallInvite)
416 }
417
418 AnyMessageLikeEventContent::RtcNotification(c) => {
419 Self::add_item(TimelineItemContent::RtcNotification {
420 call_intent: c.call_intent,
421 declined_by: Vec::new(),
422 })
423 }
424
425 AnyMessageLikeEventContent::RtcDecline(c) => Self::HandleAggregation {
426 related_event: c.relates_to.event_id,
427 kind: HandleAggregationKind::CallDeclined,
428 },
429
430 AnyMessageLikeEventContent::Sticker(content) => {
431 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
432 kind: MsgLikeKind::Sticker(Sticker { content }),
433 reactions: Default::default(),
434 thread_root,
435 in_reply_to,
436 thread_summary,
437 }))
438 }
439
440 AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
441 c,
442 )) => {
443 let poll_state = PollState::new(c.poll_start, c.text);
444
445 Self::AddItem {
446 content: TimelineItemContent::MsgLike(MsgLikeContent {
447 kind: MsgLikeKind::Poll(poll_state),
448 reactions: Default::default(),
449 thread_root,
450 in_reply_to,
451 thread_summary,
452 }),
453 }
454 }
455
456 AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
457 content: TimelineItemContent::message(
458 msg.msgtype,
459 msg.mentions,
460 Default::default(),
461 thread_root,
462 in_reply_to,
463 thread_summary,
464 ),
465 },
466
467 AnyMessageLikeEventContent::Beacon(content) => Self::HandleAggregation {
468 related_event: content.relates_to.event_id,
469 kind: HandleAggregationKind::BeaconUpdate {
470 location: BeaconInfo {
471 geo_uri: content.location.uri,
472 ts: content.ts,
473 description: content.location.description,
474 encryption_info: None, },
476 },
477 },
478
479 event => {
480 let other = OtherMessageLike { event_type: event.event_type() };
481
482 Self::AddItem {
483 content: TimelineItemContent::MsgLike(MsgLikeContent {
484 kind: MsgLikeKind::Other(other),
485 reactions: Default::default(),
486 thread_root,
487 in_reply_to,
488 thread_summary,
489 }),
490 }
491 }
492 }
493 }
494
495 pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
496 let error = Arc::new(error);
497 match event {
498 FailedToParseEvent::State { event_type, state_key } => {
499 Self::add_item(TimelineItemContent::FailedToParseState {
500 event_type,
501 state_key,
502 error,
503 })
504 }
505 FailedToParseEvent::MsgLike(event_type) => {
506 Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
507 }
508 }
509 }
510}
511
512#[derive(Debug)]
513pub(super) enum FailedToParseEvent {
514 MsgLike(MessageLikeEventType),
515 State { event_type: StateEventType, state_key: String },
516}
517
518#[derive(Clone, Copy, Debug)]
520pub(super) enum TimelineItemPosition {
521 Start {
524 origin: RemoteEventOrigin,
526 },
527
528 End {
531 origin: RemoteEventOrigin,
533 },
534
535 At {
537 event_index: usize,
539
540 origin: RemoteEventOrigin,
542 },
543
544 UpdateAt {
549 timeline_item_index: usize,
551 },
552}
553
554pub(super) type RemovedItem = bool;
556
557pub(super) struct TimelineEventHandler<'a, 'o> {
564 items: &'a mut ObservableItemsTransaction<'o>,
565 meta: &'a mut TimelineMetadata,
566 ctx: TimelineEventContext,
567}
568
569impl<'a, 'o> TimelineEventHandler<'a, 'o> {
570 pub(super) fn new<P: RoomDataProvider>(
571 state: &'a mut TimelineStateTransaction<'o, P>,
572 ctx: TimelineEventContext,
573 ) -> Self {
574 let TimelineStateTransaction { items, meta, .. } = state;
575 Self { items, meta, ctx }
576 }
577
578 #[instrument(skip_all, fields(txn_id, event_id, position))]
592 pub(super) async fn handle_event(
593 mut self,
594 date_divider_adjuster: &mut DateDividerAdjuster,
595 timeline_action: TimelineAction,
596 recycled_timeline_id: Option<TimelineUniqueId>,
597 ) -> bool {
598 let span = tracing::Span::current();
599
600 date_divider_adjuster.mark_used();
601
602 match &self.ctx.flow {
603 Flow::Local { txn_id, .. } => {
604 span.record("txn_id", debug(txn_id));
605 debug!("Handling local event");
606 }
607
608 Flow::Remote { event_id, txn_id, position, .. } => {
609 span.record("event_id", debug(event_id));
610 span.record("position", debug(position));
611 if let Some(txn_id) = txn_id {
612 span.record("txn_id", debug(txn_id));
613 }
614 trace!("Handling remote event");
615 }
616 }
617
618 let mut added_item = false;
619
620 match timeline_action {
621 TimelineAction::AddItem { content } => {
622 if self.ctx.should_add_new_items {
623 self.add_item(content, recycled_timeline_id);
624 added_item = true;
625 }
626 }
627
628 TimelineAction::HandleAggregation { related_event, kind } => match kind {
629 HandleAggregationKind::Reaction { key } => {
630 self.handle_reaction(related_event, key);
631 }
632 HandleAggregationKind::Redaction => {
633 self.handle_redaction(related_event);
634 }
635 HandleAggregationKind::Edit { replacement } => {
636 self.handle_edit(
637 replacement.event_id.clone(),
638 PendingEditKind::RoomMessage(replacement),
639 );
640 }
641 HandleAggregationKind::PollResponse { answers } => {
642 self.handle_poll_response(related_event, answers);
643 }
644 HandleAggregationKind::PollEdit { replacement } => {
645 self.handle_edit(
646 replacement.event_id.clone(),
647 PendingEditKind::Poll(replacement),
648 );
649 }
650 HandleAggregationKind::PollEnd => {
651 self.handle_poll_end(related_event);
652 }
653 HandleAggregationKind::BeaconUpdate { mut location } => {
654 let encryption_info = as_variant!(
658 &self.ctx.flow,
659 Flow::Remote { encryption_info, .. } => encryption_info.clone()
660 )
661 .flatten();
662 location.encryption_info = encryption_info;
663
664 self.handle_beacon_update(related_event, location);
665 }
666 HandleAggregationKind::BeaconStop { content } => {
667 self.handle_beacon_stop(content);
668 }
669 HandleAggregationKind::CallDeclined => {
670 self.handle_call_declined(related_event);
671 }
672 },
673 }
674
675 added_item
676 }
677
678 #[instrument(skip(self, edit_kind))]
679 fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
680 let target = TimelineEventItemId::EventId(edited_event_id.clone());
681
682 let encryption_info =
683 as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
684 let aggregation = Aggregation::new(
685 self.ctx.flow.timeline_item_id(),
686 AggregationKind::Edit(PendingEdit {
687 kind: edit_kind,
688 edit_json: self.ctx.flow.raw_event().cloned(),
689 encryption_info,
690 bundled_item_owner: None,
691 }),
692 );
693
694 self.meta.aggregations.add(target.clone(), aggregation.clone());
695
696 if let Some(new_item) = find_item_and_apply_aggregation(
697 &self.meta.aggregations,
698 self.items,
699 &target,
700 aggregation,
701 &self.meta.room_version_rules,
702 ) {
703 Self::maybe_update_responses(
705 self.meta,
706 self.items,
707 &edited_event_id,
708 EmbeddedEvent::from_timeline_item(&new_item),
709 );
710 }
711 }
712
713 #[instrument(skip(self))]
718 fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
719 let target = TimelineEventItemId::EventId(relates_to);
720
721 let reaction_status = match &self.ctx.flow {
723 Flow::Local { send_handle, .. } => {
724 ReactionStatus::LocalToRemote(send_handle.clone())
726 }
727 Flow::Remote { event_id, .. } => {
728 ReactionStatus::RemoteToRemote(event_id.clone())
730 }
731 };
732
733 let aggregation = Aggregation::new(
734 self.ctx.flow.timeline_item_id(),
735 AggregationKind::Reaction {
736 key: reaction_key,
737 sender: self.ctx.sender.clone(),
738 timestamp: self.ctx.timestamp,
739 reaction_status,
740 },
741 );
742
743 self.meta.aggregations.add(target.clone(), aggregation.clone());
744 find_item_and_apply_aggregation(
745 &self.meta.aggregations,
746 self.items,
747 &target,
748 aggregation,
749 &self.meta.room_version_rules,
750 );
751 }
752
753 fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
754 let target = TimelineEventItemId::EventId(poll_event_id);
755 let aggregation = Aggregation::new(
756 self.ctx.flow.timeline_item_id(),
757 AggregationKind::PollResponse {
758 sender: self.ctx.sender.clone(),
759 timestamp: self.ctx.timestamp,
760 answers,
761 },
762 );
763 self.meta.aggregations.add(target.clone(), aggregation.clone());
764 find_item_and_apply_aggregation(
765 &self.meta.aggregations,
766 self.items,
767 &target,
768 aggregation,
769 &self.meta.room_version_rules,
770 );
771 }
772
773 fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
774 let target = TimelineEventItemId::EventId(poll_event_id);
775 let aggregation = Aggregation::new(
776 self.ctx.flow.timeline_item_id(),
777 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
778 );
779 self.meta.aggregations.add(target.clone(), aggregation.clone());
780 find_item_and_apply_aggregation(
781 &self.meta.aggregations,
782 self.items,
783 &target,
784 aggregation,
785 &self.meta.room_version_rules,
786 );
787 }
788
789 #[instrument(skip(self, content))]
796 fn handle_beacon_stop(&mut self, content: BeaconInfoEventContent) {
797 let sender = &self.ctx.sender;
798
799 let target_event_id = super::algorithms::rfind_event_item(self.items, |item| {
801 item.sender() == sender
802 && item.content().as_live_location_state().is_some_and(|s| s.matches_stop(&content))
803 })
804 .and_then(|(_, event_item)| event_item.inner.event_id().map(ToOwned::to_owned));
805
806 let aggregation = Aggregation::new(
807 self.ctx.flow.timeline_item_id(),
808 AggregationKind::BeaconStop { content },
809 );
810
811 let Some(target_event_id) = target_event_id else {
812 trace!(
815 "no matching live beacon_info item found for {sender}; \
816 stashing stop event to apply when the start item arrives"
817 );
818 self.meta.aggregations.add_pending_beacon_stop(sender.clone(), aggregation);
819 return;
820 };
821
822 let target = TimelineEventItemId::EventId(target_event_id);
823 self.meta.aggregations.add(target.clone(), aggregation.clone());
824 find_item_and_apply_aggregation(
825 &self.meta.aggregations,
826 self.items,
827 &target,
828 aggregation,
829 &self.meta.room_version_rules,
830 );
831 }
832
833 #[instrument(skip(self, location))]
836 fn handle_beacon_update(&mut self, beacon_info_event_id: OwnedEventId, location: BeaconInfo) {
837 let target = TimelineEventItemId::EventId(beacon_info_event_id);
838 let aggregation = Aggregation::new(
839 self.ctx.flow.timeline_item_id(),
840 AggregationKind::BeaconUpdate { location },
841 );
842 self.meta.aggregations.add(target.clone(), aggregation.clone());
843 find_item_and_apply_aggregation(
844 &self.meta.aggregations,
845 self.items,
846 &target,
847 aggregation,
848 &self.meta.room_version_rules,
849 );
850 }
851
852 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
858 fn handle_redaction(&mut self, redacted: OwnedEventId) {
859 if self.handle_aggregation_redaction(redacted.clone()) {
864 return;
867 }
868
869 let target = TimelineEventItemId::EventId(redacted.clone());
870 let aggregation = Aggregation::new(
871 self.ctx.flow.timeline_item_id(),
872 AggregationKind::Redaction {
873 is_local: false, },
875 );
876 self.meta.aggregations.add(target.clone(), aggregation.clone());
877
878 find_item_and_apply_aggregation(
879 &self.meta.aggregations,
880 self.items,
881 &target,
882 aggregation,
883 &self.meta.room_version_rules,
884 );
885
886 let embedded_event = EmbeddedEvent {
889 content: TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
890 sender: self.ctx.sender.clone(),
891 sender_profile: TimelineDetails::from_initial_value(self.ctx.sender_profile.clone()),
892 timestamp: self.ctx.timestamp,
893 identifier: TimelineEventItemId::EventId(redacted.clone()),
894 };
895
896 Self::maybe_update_responses(self.meta, self.items, &redacted, embedded_event);
897 }
898
899 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
904 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
905 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
906
907 match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
908 Ok(val) => val,
909 Err(err) => {
911 warn!("error while attempting to remove aggregation: {err}");
912 true
914 }
915 }
916 }
917
918 fn handle_call_declined(&mut self, notification_event_id: OwnedEventId) {
921 let target = TimelineEventItemId::EventId(notification_event_id);
922 let aggregation = Aggregation::new(
923 self.ctx.flow.timeline_item_id(),
924 AggregationKind::CallDeclined { sender: self.ctx.sender.clone() },
925 );
926 self.meta.aggregations.add(target.clone(), aggregation.clone());
927 find_item_and_apply_aggregation(
928 &self.meta.aggregations,
929 self.items,
930 &target,
931 aggregation,
932 &self.meta.room_version_rules,
933 );
934 }
935
936 fn add_item(
949 &mut self,
950 content: TimelineItemContent,
951 recycled_timeline_id: Option<TimelineUniqueId>,
952 ) {
953 let sender = self.ctx.sender.to_owned();
954 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
955
956 let forwarder = self.ctx.forwarder.to_owned();
957 let forwarder_profile = self
958 .ctx
959 .forwarder
960 .as_ref()
961 .map(|_| TimelineDetails::from_initial_value(self.ctx.forwarder_profile.clone()));
962
963 let timestamp = self.ctx.timestamp;
964
965 let kind: EventTimelineItemKind = match &self.ctx.flow {
966 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
967 send_state: EventSendState::NotSentYet { progress: None },
968 transaction_id: txn_id.to_owned(),
969 send_handle: send_handle.clone(),
970 }
971 .into(),
972
973 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
974 let origin = match *position {
975 TimelineItemPosition::Start { origin }
976 | TimelineItemPosition::End { origin }
977 | TimelineItemPosition::At { origin, .. } => origin,
978
979 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
981 .as_event()
982 .and_then(|ev| Some(ev.as_remote()?.origin))
983 .unwrap_or_else(|| {
984 error!("Tried to update a local event");
985 RemoteEventOrigin::Unknown
986 }),
987 };
988
989 RemoteEventTimelineItem {
990 event_id: event_id.clone(),
991 transaction_id: txn_id.clone(),
992 read_receipts: self.ctx.read_receipts.clone(),
993 is_own: self.ctx.sender == self.meta.own_user_id,
994 is_highlighted: self.ctx.is_highlighted,
995 encryption_info: encryption_info.clone(),
996 original_json: Some(raw_event.clone()),
997 latest_edit_json: None,
998 origin,
999 }
1000 .into()
1001 }
1002 };
1003
1004 let is_room_encrypted = self.meta.is_room_encrypted;
1005
1006 let item = EventTimelineItem::new(
1007 sender,
1008 sender_profile,
1009 forwarder,
1010 forwarder_profile,
1011 timestamp,
1012 content,
1013 kind,
1014 is_room_encrypted,
1015 );
1016
1017 let mut cowed = Cow::Owned(item);
1019 if let Err(err) = self.meta.aggregations.apply_all(
1020 &self.ctx.flow.timeline_item_id(),
1021 &self.ctx.sender,
1022 &mut cowed,
1023 self.items,
1024 &self.meta.room_version_rules,
1025 ) {
1026 warn!("discarding aggregations: {err}");
1027 }
1028 let item = cowed.into_owned();
1029
1030 match &self.ctx.flow {
1031 Flow::Local { .. } => {
1032 trace!("Adding new local timeline item");
1033
1034 let item = self.meta.new_timeline_item_with_internal_id(item, recycled_timeline_id);
1035
1036 self.items.push_local(item);
1037 }
1038
1039 Flow::Remote {
1040 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
1041 } => {
1042 let item = Self::recycle_local_or_create_item(
1043 self.items,
1044 self.meta,
1045 item,
1046 event_id,
1047 txn_id.as_deref(),
1048 recycled_timeline_id,
1049 );
1050
1051 trace!("Adding new remote timeline item at the start");
1052
1053 self.items.push_front(item, Some(0));
1054 }
1055
1056 Flow::Remote {
1057 position: TimelineItemPosition::At { event_index, .. },
1058 event_id,
1059 txn_id,
1060 ..
1061 } => {
1062 let item = Self::recycle_local_or_create_item(
1063 self.items,
1064 self.meta,
1065 item,
1066 event_id,
1067 txn_id.as_deref(),
1068 recycled_timeline_id,
1069 );
1070
1071 let all_remote_events = self.items.all_remote_events();
1072 let event_index = *event_index;
1073
1074 let timeline_item_index = all_remote_events
1076 .range(0..=event_index)
1077 .rev()
1078 .find_map(|event_meta| event_meta.timeline_item_index)
1079 .map(|timeline_item_index| timeline_item_index + 1);
1081
1082 let timeline_item_index = timeline_item_index.or_else(|| {
1085 all_remote_events
1086 .range(event_index + 1..)
1087 .find_map(|event_meta| event_meta.timeline_item_index)
1088 });
1089
1090 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
1093 self.items
1094 .iter_remotes_region()
1095 .rev()
1096 .find_map(|(timeline_item_index, timeline_item)| {
1097 timeline_item.as_event().map(|_| timeline_item_index + 1)
1098 })
1099 .unwrap_or_else(|| {
1100 self.items.first_remotes_region_index()
1103 })
1104 });
1105
1106 trace!(
1107 ?event_index,
1108 ?timeline_item_index,
1109 "Adding new remote timeline at specific event index"
1110 );
1111
1112 self.items.insert(timeline_item_index, item, Some(event_index));
1113 }
1114
1115 Flow::Remote {
1116 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
1117 } => {
1118 let item = Self::recycle_local_or_create_item(
1119 self.items,
1120 self.meta,
1121 item,
1122 event_id,
1123 txn_id.as_deref(),
1124 recycled_timeline_id,
1125 );
1126
1127 let timeline_item_index = self
1129 .items
1130 .iter_remotes_region()
1131 .rev()
1132 .find_map(|(timeline_item_index, timeline_item)| {
1133 timeline_item.as_event().map(|_| timeline_item_index + 1)
1134 })
1135 .unwrap_or_else(|| {
1136 self.items.first_remotes_region_index()
1139 });
1140
1141 let event_index = self
1142 .items
1143 .all_remote_events()
1144 .last_index()
1145 .or_else(|| {
1149 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
1150
1151 Some(0)
1152 });
1153
1154 if timeline_item_index == self.items.len() {
1163 trace!("Adding new remote timeline item at the back");
1164 self.items.push_back(item, event_index);
1165 } else if timeline_item_index == 0 {
1166 trace!("Adding new remote timeline item at the front");
1167 self.items.push_front(item, event_index);
1168 } else {
1169 trace!(
1170 timeline_item_index,
1171 "Adding new remote timeline item at specific index"
1172 );
1173 self.items.insert(timeline_item_index, item, event_index);
1174 }
1175 }
1176
1177 Flow::Remote {
1178 event_id: decrypted_event_id,
1179 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
1180 ..
1181 } => {
1182 trace!("Updating timeline item at position {idx}");
1183
1184 Self::maybe_update_responses(
1186 self.meta,
1187 self.items,
1188 decrypted_event_id,
1189 EmbeddedEvent::from_timeline_item(&item),
1190 );
1191
1192 let internal_id = self.items[*idx].internal_id.clone();
1193 self.items.replace(*idx, TimelineItem::new(item, internal_id));
1194 }
1195 }
1196
1197 if !self.meta.has_up_to_date_read_marker_item {
1199 self.meta.update_read_marker(self.items);
1200 }
1201 }
1202
1203 fn recycle_local_or_create_item(
1209 items: &mut ObservableItemsTransaction<'_>,
1210 meta: &mut TimelineMetadata,
1211 mut new_item: EventTimelineItem,
1212 event_id: &EventId,
1213 transaction_id: Option<&TransactionId>,
1214 recycled_timeline_id: Option<TimelineUniqueId>,
1215 ) -> Arc<TimelineItem> {
1216 if let Some((local_timeline_item_index, local_timeline_item)) = items
1218 .iter_locals_region()
1220 .rev()
1222 .find_map(|(nth, timeline_item)| {
1223 let event_timeline_item = timeline_item.as_event()?;
1224
1225 if Some(event_id) == event_timeline_item.event_id()
1226 || (transaction_id.is_some()
1227 && transaction_id == event_timeline_item.transaction_id())
1228 {
1229 Some((nth, event_timeline_item))
1231 } else {
1232 None
1235 }
1236 })
1237 {
1238 trace!(
1239 ?event_id,
1240 ?transaction_id,
1241 ?local_timeline_item_index,
1242 "Removing local timeline item"
1243 );
1244
1245 transfer_details(&mut new_item, local_timeline_item);
1246
1247 let recycled = items.remove(local_timeline_item_index);
1249 TimelineItem::new(new_item, recycled.internal_id.clone())
1250 } else {
1251 meta.new_timeline_item_with_internal_id(new_item, recycled_timeline_id)
1253 }
1254 }
1255
1256 fn maybe_update_responses(
1259 meta: &mut TimelineMetadata,
1260 items: &mut ObservableItemsTransaction<'_>,
1261 target_event_id: &EventId,
1262 new_embedded_event: EmbeddedEvent,
1263 ) {
1264 let Some(replies) = meta.replies.get(target_event_id) else {
1265 trace!("item has no replies");
1266 return;
1267 };
1268
1269 for reply_id in replies {
1270 let Some(timeline_item_index) = items
1271 .get_remote_event_by_event_id(reply_id)
1272 .and_then(|meta| meta.timeline_item_index)
1273 else {
1274 warn!(%reply_id, "event not known as an item in the timeline");
1275 continue;
1276 };
1277
1278 let Some(item) = items.get(timeline_item_index) else {
1279 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1280 continue;
1281 };
1282
1283 let Some(event_item) = item.as_event() else { continue };
1284 let Some(msglike) = event_item.content.as_msglike() else { continue };
1285 let Some(message) = msglike.as_message() else { continue };
1286 let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1287
1288 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1289 let in_reply_to = InReplyToDetails {
1290 event_id: in_reply_to.event_id.clone(),
1291 event: TimelineDetails::Ready(Box::new(new_embedded_event.clone())),
1292 };
1293
1294 let new_reply_content = TimelineItemContent::MsgLike(
1295 msglike
1296 .with_in_reply_to(in_reply_to)
1297 .with_kind(MsgLikeKind::Message(message.clone())),
1298 );
1299 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1300 items.replace(timeline_item_index, new_reply_item);
1301 }
1302 }
1303}
1304
1305fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1313 let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1314 return;
1315 };
1316 let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1317 return;
1318 };
1319
1320 let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1321 let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1322
1323 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1324 in_reply_to.event = old_in_reply_to.event.clone();
1325 }
1326}