matrix_sdk_ui/timeline/
event_handler.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20    crypto::types::events::UtdCause,
21    deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22    send_queue::SendHandle,
23};
24use ruma::{
25    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26    TransactionId,
27    events::{
28        AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29        AnySyncTimelineEvent, FullStateEventContent, MessageLikeEventContent, MessageLikeEventType,
30        StateEventType, SyncStateEvent,
31        poll::unstable_start::{
32            NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
33        },
34        receipt::Receipt,
35        relation::Replacement,
36        room::message::{
37            Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
38        },
39    },
40    serde::Raw,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45    EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
46    MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
47    TimelineItemContent,
48    controller::{
49        Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
50        TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
51    },
52    date_dividers::DateDividerAdjuster,
53    event_item::{
54        AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
55        LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
56        TimelineEventItemId,
57    },
58    traits::RoomDataProvider,
59};
60use crate::{
61    timeline::controller::aggregations::PendingEdit, unable_to_decrypt_hook::UtdHookManager,
62};
63
64/// When adding an event, useful information related to the source of the event.
65pub(super) enum Flow {
66    /// The event was locally created.
67    Local {
68        /// The transaction id we've used in requests associated to this event.
69        txn_id: OwnedTransactionId,
70
71        /// A handle to manipulate this event.
72        send_handle: Option<SendHandle>,
73    },
74
75    /// The event has been received from a remote source (sync, pagination,
76    /// etc.). This can be a "remote echo".
77    Remote {
78        /// The event identifier as returned by the server.
79        event_id: OwnedEventId,
80        /// The transaction id we might have used, if we're the sender of the
81        /// event.
82        txn_id: Option<OwnedTransactionId>,
83        /// The raw serialized JSON event.
84        raw_event: Raw<AnySyncTimelineEvent>,
85        /// Where should this be added in the timeline.
86        position: TimelineItemPosition,
87        /// Information about the encryption for this event.
88        encryption_info: Option<Arc<EncryptionInfo>>,
89    },
90}
91
92impl Flow {
93    /// Returns the [`TimelineEventItemId`] associated to this future item.
94    pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
95        match self {
96            Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
97            Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
98        }
99    }
100
101    /// If the flow is remote, returns the associated full raw event.
102    pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
103        as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
104    }
105}
106
107pub(super) struct TimelineEventContext {
108    pub(super) sender: OwnedUserId,
109    pub(super) sender_profile: Option<Profile>,
110    /// The event's `origin_server_ts` field (or creation time for local echo).
111    pub(super) timestamp: MilliSecondsSinceUnixEpoch,
112    pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
113    pub(super) is_highlighted: bool,
114    pub(super) flow: Flow,
115
116    /// If the event represents a new item, should it be added to the timeline?
117    ///
118    /// This controls whether a new timeline *may* be added. If the update kind
119    /// is about an update to an existing timeline item (redaction, edit,
120    /// reaction, etc.), it's always handled by default.
121    pub(super) should_add_new_items: bool,
122}
123
124/// Which kind of aggregation (i.e. modification of a related event) are we
125/// going to handle?
126#[derive(Clone, Debug)]
127pub(super) enum HandleAggregationKind {
128    /// Adding a reaction to the related event.
129    Reaction { key: String },
130
131    /// Redacting (removing) the related event.
132    Redaction,
133
134    /// Editing (replacing) the related event with another one.
135    Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
136
137    /// Responding to the related poll event.
138    PollResponse { answers: Vec<String> },
139
140    /// Editing a related poll event's description.
141    PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
142
143    /// Ending a related poll.
144    PollEnd,
145}
146
147/// An action that we want to cause on the timeline.
148#[derive(Clone, Debug)]
149#[allow(clippy::large_enum_variant)]
150pub(super) enum TimelineAction {
151    /// Add a new timeline item.
152    ///
153    /// This enqueues adding a new item to the timeline (i.e. push to the items
154    /// array in its state). The item may be filtered out, and thus not
155    /// added later.
156    AddItem {
157        /// The content of the item we want to add.
158        content: TimelineItemContent,
159    },
160
161    /// Handle an aggregation to another event.
162    ///
163    /// The event the aggregation is related to might not be included in the
164    /// timeline, in which case it will be stashed somewhere, until we see
165    /// the related event.
166    HandleAggregation {
167        /// To which other event does this aggregation apply to?
168        related_event: OwnedEventId,
169        /// What kind of aggregation are we handling here?
170        kind: HandleAggregationKind,
171    },
172}
173
174impl TimelineAction {
175    /// Create a new [`TimelineEventKind::AddItem`].
176    fn add_item(content: TimelineItemContent) -> Self {
177        Self::AddItem { content }
178    }
179
180    /// Create a new [`TimelineAction`] from a given remote event.
181    ///
182    /// The return value may be `None` if handling the event (be it a new item
183    /// or an aggregation) is not supported for this event type.
184    #[allow(clippy::too_many_arguments)]
185    pub async fn from_event<P: RoomDataProvider>(
186        event: AnySyncTimelineEvent,
187        raw_event: &Raw<AnySyncTimelineEvent>,
188        room_data_provider: &P,
189        unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
190        in_reply_to: Option<InReplyToDetails>,
191        thread_root: Option<OwnedEventId>,
192        thread_summary: Option<ThreadSummary>,
193    ) -> Option<Self> {
194        let redaction_rules = room_data_provider.room_version_rules().redaction;
195
196        let redacted_message_or_none = |event_type: MessageLikeEventType| {
197            (event_type != MessageLikeEventType::Reaction)
198                .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
199        };
200
201        Some(match event {
202            AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
203                if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
204                    Self::HandleAggregation {
205                        related_event: redacts,
206                        kind: HandleAggregationKind::Redaction,
207                    }
208                } else {
209                    Self::add_item(redacted_message_or_none(ev.event_type())?)
210                }
211            }
212
213            AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
214                Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
215                    // An event which is still encrypted.
216                    if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
217                        unable_to_decrypt
218                    {
219                        let utd_cause = UtdCause::determine(
220                            raw_event,
221                            room_data_provider.crypto_context_info().await,
222                            &unable_to_decrypt_info,
223                        );
224
225                        // Let the hook know that we ran into an unable-to-decrypt that is added to
226                        // the timeline.
227                        if let Some(hook) = unable_to_decrypt_hook_manager {
228                            hook.on_utd(
229                                ev.event_id(),
230                                utd_cause,
231                                ev.origin_server_ts(),
232                                ev.sender(),
233                            )
234                            .await;
235                        }
236
237                        Self::add_item(TimelineItemContent::MsgLike(
238                            MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
239                                content, utd_cause,
240                            )),
241                        ))
242                    } else {
243                        // If we get here, it means that some part of the code has created a
244                        // `TimelineEvent` containing an `m.room.encrypted` event without
245                        // decrypting it. Possibly this means that encryption has not been
246                        // configured. We treat it the same as any other message-like event.
247                        return Self::from_content(
248                            AnyMessageLikeEventContent::RoomEncrypted(content),
249                            in_reply_to,
250                            thread_root,
251                            thread_summary,
252                        );
253                    }
254                }
255
256                Some(content) => {
257                    return Self::from_content(content, in_reply_to, thread_root, thread_summary);
258                }
259
260                None => Self::add_item(redacted_message_or_none(ev.event_type())?),
261            },
262
263            AnySyncTimelineEvent::State(ev) => match ev {
264                AnySyncStateEvent::RoomMember(ev) => match ev {
265                    SyncStateEvent::Original(ev) => {
266                        Self::add_item(TimelineItemContent::room_member(
267                            ev.state_key,
268                            FullStateEventContent::Original {
269                                content: ev.content,
270                                prev_content: ev.unsigned.prev_content,
271                            },
272                            ev.sender,
273                        ))
274                    }
275                    SyncStateEvent::Redacted(ev) => {
276                        Self::add_item(TimelineItemContent::room_member(
277                            ev.state_key,
278                            FullStateEventContent::Redacted(ev.content),
279                            ev.sender,
280                        ))
281                    }
282                },
283                ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
284                    state_key: ev.state_key().to_owned(),
285                    content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
286                })),
287            },
288        })
289    }
290
291    /// Create a new [`TimelineAction`] from a given event's content.
292    ///
293    /// This is applicable to both remote event (as this is called from
294    /// [`TimelineAction::from_event`]) or local events (for which we only have
295    /// the content).
296    ///
297    /// The return value may be `None` if handling the event (be it a new item
298    /// or an aggregation) is not supported for this event type.
299    pub(super) fn from_content(
300        content: AnyMessageLikeEventContent,
301        in_reply_to: Option<InReplyToDetails>,
302        thread_root: Option<OwnedEventId>,
303        thread_summary: Option<ThreadSummary>,
304    ) -> Option<Self> {
305        Some(match content {
306            AnyMessageLikeEventContent::Reaction(c) => {
307                // This is a reaction to a message.
308                Self::HandleAggregation {
309                    related_event: c.relates_to.event_id.clone(),
310                    kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
311                }
312            }
313
314            AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
315                relates_to: Some(Relation::Replacement(re)),
316                ..
317            }) => Self::HandleAggregation {
318                related_event: re.event_id.clone(),
319                kind: HandleAggregationKind::Edit { replacement: re },
320            },
321
322            AnyMessageLikeEventContent::UnstablePollStart(
323                UnstablePollStartEventContent::Replacement(re),
324            ) => Self::HandleAggregation {
325                related_event: re.relates_to.event_id.clone(),
326                kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
327            },
328
329            AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
330                related_event: c.relates_to.event_id,
331                kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
332            },
333
334            AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
335                related_event: c.relates_to.event_id,
336                kind: HandleAggregationKind::PollEnd,
337            },
338
339            AnyMessageLikeEventContent::CallInvite(_) => {
340                Self::add_item(TimelineItemContent::CallInvite)
341            }
342
343            AnyMessageLikeEventContent::CallNotify(_) => {
344                Self::add_item(TimelineItemContent::CallNotify)
345            }
346
347            AnyMessageLikeEventContent::Sticker(content) => {
348                Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
349                    kind: MsgLikeKind::Sticker(Sticker { content }),
350                    reactions: Default::default(),
351                    thread_root,
352                    in_reply_to,
353                    thread_summary,
354                }))
355            }
356
357            AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
358                c,
359            )) => {
360                let poll_state = PollState::new(c);
361
362                Self::AddItem {
363                    content: TimelineItemContent::MsgLike(MsgLikeContent {
364                        kind: MsgLikeKind::Poll(poll_state),
365                        reactions: Default::default(),
366                        thread_root,
367                        in_reply_to,
368                        thread_summary,
369                    }),
370                }
371            }
372
373            AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
374                content: TimelineItemContent::message(
375                    msg.msgtype,
376                    msg.mentions,
377                    Default::default(),
378                    thread_root,
379                    in_reply_to,
380                    thread_summary,
381                ),
382            },
383
384            _ => {
385                debug!(
386                    "Ignoring message-like event of type `{}`, not supported (yet)",
387                    content.event_type()
388                );
389                return None;
390            }
391        })
392    }
393
394    pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
395        let error = Arc::new(error);
396        match event {
397            FailedToParseEvent::State { event_type, state_key } => {
398                Self::add_item(TimelineItemContent::FailedToParseState {
399                    event_type,
400                    state_key,
401                    error,
402                })
403            }
404            FailedToParseEvent::MsgLike(event_type) => {
405                Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
406            }
407        }
408    }
409}
410
411#[derive(Debug)]
412pub(super) enum FailedToParseEvent {
413    MsgLike(MessageLikeEventType),
414    State { event_type: StateEventType, state_key: String },
415}
416
417/// The position at which to perform an update of the timeline with events.
418#[derive(Clone, Copy, Debug)]
419pub(super) enum TimelineItemPosition {
420    /// One or more items are prepended to the timeline (i.e. they're the
421    /// oldest).
422    Start {
423        /// The origin of the new item(s).
424        origin: RemoteEventOrigin,
425    },
426
427    /// One or more items are appended to the timeline (i.e. they're the most
428    /// recent).
429    End {
430        /// The origin of the new item(s).
431        origin: RemoteEventOrigin,
432    },
433
434    /// One item is inserted to the timeline.
435    At {
436        /// Where to insert the remote event.
437        event_index: usize,
438
439        /// The origin of the new item.
440        origin: RemoteEventOrigin,
441    },
442
443    /// A single item is updated.
444    ///
445    /// This can happen for instance after a UTD has been successfully
446    /// decrypted, or when it's been redacted at the source.
447    UpdateAt {
448        /// The index of the **timeline item**.
449        timeline_item_index: usize,
450    },
451}
452
453/// Whether an item was removed or not.
454pub(super) type RemovedItem = bool;
455
456/// Data necessary to update the timeline, given a single event to handle.
457///
458/// Bundles together a few things that are needed throughout the different
459/// stages of handling an event (figuring out whether it should update an
460/// existing timeline item, transforming that item or creating a new one,
461/// updating the reactive Vec).
462pub(super) struct TimelineEventHandler<'a, 'o> {
463    items: &'a mut ObservableItemsTransaction<'o>,
464    meta: &'a mut TimelineMetadata,
465    ctx: TimelineEventContext,
466}
467
468impl<'a, 'o> TimelineEventHandler<'a, 'o> {
469    pub(super) fn new<P: RoomDataProvider>(
470        state: &'a mut TimelineStateTransaction<'o, P>,
471        ctx: TimelineEventContext,
472    ) -> Self {
473        let TimelineStateTransaction { items, meta, .. } = state;
474        Self { items, meta, ctx }
475    }
476
477    /// Handle an event.
478    ///
479    /// Returns if an item was added to the timeline due to the new timeline
480    /// action. Items might not be added to the timeline for various reasons,
481    /// some common ones are if the item:
482    ///     - Contains an unsupported event type.
483    ///     - Is an edit or a redaction.
484    ///     - Contains a local echo turning into a remote echo.
485    ///     - Contains a message that is already in the timeline but was now
486    ///       decrypted.
487    ///
488    /// `raw_event` is only needed to determine the cause of any UTDs,
489    /// so if we know this is not a UTD it can be None.
490    #[instrument(skip_all, fields(txn_id, event_id, position))]
491    pub(super) async fn handle_event(
492        mut self,
493        date_divider_adjuster: &mut DateDividerAdjuster,
494        timeline_action: TimelineAction,
495    ) -> bool {
496        let span = tracing::Span::current();
497
498        date_divider_adjuster.mark_used();
499
500        match &self.ctx.flow {
501            Flow::Local { txn_id, .. } => {
502                span.record("txn_id", debug(txn_id));
503                debug!("Handling local event");
504            }
505
506            Flow::Remote { event_id, txn_id, position, .. } => {
507                span.record("event_id", debug(event_id));
508                span.record("position", debug(position));
509                if let Some(txn_id) = txn_id {
510                    span.record("txn_id", debug(txn_id));
511                }
512                trace!("Handling remote event");
513            }
514        }
515
516        let mut added_item = false;
517
518        match timeline_action {
519            TimelineAction::AddItem { content } => {
520                if self.ctx.should_add_new_items {
521                    self.add_item(content);
522                    added_item = true;
523                }
524            }
525
526            TimelineAction::HandleAggregation { related_event, kind } => match kind {
527                HandleAggregationKind::Reaction { key } => {
528                    self.handle_reaction(related_event, key);
529                }
530                HandleAggregationKind::Redaction => {
531                    self.handle_redaction(related_event);
532                }
533                HandleAggregationKind::Edit { replacement } => {
534                    self.handle_edit(
535                        replacement.event_id.clone(),
536                        PendingEditKind::RoomMessage(replacement),
537                    );
538                }
539                HandleAggregationKind::PollResponse { answers } => {
540                    self.handle_poll_response(related_event, answers);
541                }
542                HandleAggregationKind::PollEdit { replacement } => {
543                    self.handle_edit(
544                        replacement.event_id.clone(),
545                        PendingEditKind::Poll(replacement),
546                    );
547                }
548                HandleAggregationKind::PollEnd => {
549                    self.handle_poll_end(related_event);
550                }
551            },
552        }
553
554        added_item
555    }
556
557    #[instrument(skip(self, edit_kind))]
558    fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
559        let target = TimelineEventItemId::EventId(edited_event_id.clone());
560
561        let encryption_info =
562            as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
563        let aggregation = Aggregation::new(
564            self.ctx.flow.timeline_item_id(),
565            AggregationKind::Edit(PendingEdit {
566                kind: edit_kind,
567                edit_json: self.ctx.flow.raw_event().cloned(),
568                encryption_info,
569                bundled_item_owner: None,
570            }),
571        );
572
573        self.meta.aggregations.add(target.clone(), aggregation.clone());
574
575        if let Some(new_item) = find_item_and_apply_aggregation(
576            &self.meta.aggregations,
577            self.items,
578            &target,
579            aggregation,
580            &self.meta.room_version_rules,
581        ) {
582            // Update all events that replied to this message with the edited content.
583            Self::maybe_update_responses(self.meta, self.items, &edited_event_id, &new_item);
584        }
585    }
586
587    /// Apply a reaction to a *remote* event.
588    ///
589    /// Reactions to local events are applied in
590    /// [`crate::timeline::TimelineController::handle_local_echo`].
591    #[instrument(skip(self))]
592    fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
593        let target = TimelineEventItemId::EventId(relates_to);
594
595        // Add the aggregation to the manager.
596        let reaction_status = match &self.ctx.flow {
597            Flow::Local { send_handle, .. } => {
598                // This is a local echo for a reaction to a remote event.
599                ReactionStatus::LocalToRemote(send_handle.clone())
600            }
601            Flow::Remote { event_id, .. } => {
602                // This is the remote echo for a reaction to a remote event.
603                ReactionStatus::RemoteToRemote(event_id.clone())
604            }
605        };
606
607        let aggregation = Aggregation::new(
608            self.ctx.flow.timeline_item_id(),
609            AggregationKind::Reaction {
610                key: reaction_key,
611                sender: self.ctx.sender.clone(),
612                timestamp: self.ctx.timestamp,
613                reaction_status,
614            },
615        );
616
617        self.meta.aggregations.add(target.clone(), aggregation.clone());
618        find_item_and_apply_aggregation(
619            &self.meta.aggregations,
620            self.items,
621            &target,
622            aggregation,
623            &self.meta.room_version_rules,
624        );
625    }
626
627    fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
628        let target = TimelineEventItemId::EventId(poll_event_id);
629        let aggregation = Aggregation::new(
630            self.ctx.flow.timeline_item_id(),
631            AggregationKind::PollResponse {
632                sender: self.ctx.sender.clone(),
633                timestamp: self.ctx.timestamp,
634                answers,
635            },
636        );
637        self.meta.aggregations.add(target.clone(), aggregation.clone());
638        find_item_and_apply_aggregation(
639            &self.meta.aggregations,
640            self.items,
641            &target,
642            aggregation,
643            &self.meta.room_version_rules,
644        );
645    }
646
647    fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
648        let target = TimelineEventItemId::EventId(poll_event_id);
649        let aggregation = Aggregation::new(
650            self.ctx.flow.timeline_item_id(),
651            AggregationKind::PollEnd { end_date: self.ctx.timestamp },
652        );
653        self.meta.aggregations.add(target.clone(), aggregation.clone());
654        find_item_and_apply_aggregation(
655            &self.meta.aggregations,
656            self.items,
657            &target,
658            aggregation,
659            &self.meta.room_version_rules,
660        );
661    }
662
663    /// Looks for the redacted event in all the timeline event items, and
664    /// redacts it.
665    ///
666    /// This assumes the redacted event was present in the timeline in the first
667    /// place; it will warn if the redacted event has not been found.
668    #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
669    fn handle_redaction(&mut self, redacted: OwnedEventId) {
670        // TODO: Apply local redaction of PollResponse and PollEnd events.
671        // https://github.com/matrix-org/matrix-rust-sdk/pull/2381#issuecomment-1689647825
672
673        // If it's an aggregation that's being redacted, handle it here.
674        if self.handle_aggregation_redaction(redacted.clone()) {
675            // When we have raw timeline items, we should not return here anymore, as we
676            // might need to redact the raw item as well.
677            return;
678        }
679
680        let target = TimelineEventItemId::EventId(redacted.clone());
681        let aggregation =
682            Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
683        self.meta.aggregations.add(target.clone(), aggregation.clone());
684
685        if let Some(new_item) = find_item_and_apply_aggregation(
686            &self.meta.aggregations,
687            self.items,
688            &target,
689            aggregation,
690            &self.meta.room_version_rules,
691        ) {
692            // Look for any timeline event that's a reply to the redacted event, and redact
693            // the replied-to event there as well.
694            Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
695        }
696    }
697
698    /// Attempts to redact an aggregation (e.g. a reaction, a poll response,
699    /// etc.).
700    ///
701    /// Returns true if it's succeeded.
702    #[instrument(skip_all, fields(redacts = ?aggregation_id))]
703    fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
704        let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
705
706        match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
707            Ok(val) => val,
708            // This wasn't a known aggregation that was redacted.
709            Err(err) => {
710                warn!("error while attempting to remove aggregation: {err}");
711                // It could find an aggregation but didn't properly unapply it.
712                true
713            }
714        }
715    }
716
717    /// Add a new event item in the timeline.
718    ///
719    /// # Safety
720    ///
721    /// This method is not marked as unsafe **but** it manipulates
722    /// [`ObservableItemsTransaction::all_remote_events`]. 2 rules **must** be
723    /// respected:
724    ///
725    /// 1. the remote event of the item being added **must** be present in
726    ///    `all_remote_events`,
727    /// 2. the lastly added or updated remote event must be associated to the
728    ///    timeline item being added here.
729    fn add_item(&mut self, content: TimelineItemContent) {
730        let sender = self.ctx.sender.to_owned();
731        let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
732        let timestamp = self.ctx.timestamp;
733
734        let kind: EventTimelineItemKind = match &self.ctx.flow {
735            Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
736                send_state: EventSendState::NotSentYet { progress: None },
737                transaction_id: txn_id.to_owned(),
738                send_handle: send_handle.clone(),
739            }
740            .into(),
741
742            Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
743                let origin = match *position {
744                    TimelineItemPosition::Start { origin }
745                    | TimelineItemPosition::End { origin }
746                    | TimelineItemPosition::At { origin, .. } => origin,
747
748                    // For updates, reuse the origin of the encrypted event.
749                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
750                        .as_event()
751                        .and_then(|ev| Some(ev.as_remote()?.origin))
752                        .unwrap_or_else(|| {
753                            error!("Tried to update a local event");
754                            RemoteEventOrigin::Unknown
755                        }),
756                };
757
758                RemoteEventTimelineItem {
759                    event_id: event_id.clone(),
760                    transaction_id: txn_id.clone(),
761                    read_receipts: self.ctx.read_receipts.clone(),
762                    is_own: self.ctx.sender == self.meta.own_user_id,
763                    is_highlighted: self.ctx.is_highlighted,
764                    encryption_info: encryption_info.clone(),
765                    original_json: Some(raw_event.clone()),
766                    latest_edit_json: None,
767                    origin,
768                }
769                .into()
770            }
771        };
772
773        let is_room_encrypted = self.meta.is_room_encrypted;
774
775        let item = EventTimelineItem::new(
776            sender,
777            sender_profile,
778            timestamp,
779            content,
780            kind,
781            is_room_encrypted,
782        );
783
784        // Apply any pending or stashed aggregations.
785        let mut cowed = Cow::Owned(item);
786        if let Err(err) = self.meta.aggregations.apply_all(
787            &self.ctx.flow.timeline_item_id(),
788            &mut cowed,
789            self.items,
790            &self.meta.room_version_rules,
791        ) {
792            warn!("discarding aggregations: {err}");
793        }
794        let item = cowed.into_owned();
795
796        match &self.ctx.flow {
797            Flow::Local { .. } => {
798                trace!("Adding new local timeline item");
799
800                let item = self.meta.new_timeline_item(item);
801
802                self.items.push_local(item);
803            }
804
805            Flow::Remote {
806                position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
807            } => {
808                let item = Self::recycle_local_or_create_item(
809                    self.items,
810                    self.meta,
811                    item,
812                    event_id,
813                    txn_id.as_deref(),
814                );
815
816                trace!("Adding new remote timeline item at the start");
817
818                self.items.push_front(item, Some(0));
819            }
820
821            Flow::Remote {
822                position: TimelineItemPosition::At { event_index, .. },
823                event_id,
824                txn_id,
825                ..
826            } => {
827                let item = Self::recycle_local_or_create_item(
828                    self.items,
829                    self.meta,
830                    item,
831                    event_id,
832                    txn_id.as_deref(),
833                );
834
835                let all_remote_events = self.items.all_remote_events();
836                let event_index = *event_index;
837
838                // Look for the closest `timeline_item_index` at the left of `event_index`.
839                let timeline_item_index = all_remote_events
840                    .range(0..=event_index)
841                    .rev()
842                    .find_map(|event_meta| event_meta.timeline_item_index)
843                    // The new `timeline_item_index` is the previous + 1.
844                    .map(|timeline_item_index| timeline_item_index + 1);
845
846                // No index? Look for the closest `timeline_item_index` at the right of
847                // `event_index`.
848                let timeline_item_index = timeline_item_index.or_else(|| {
849                    all_remote_events
850                        .range(event_index + 1..)
851                        .find_map(|event_meta| event_meta.timeline_item_index)
852                });
853
854                // Still no index? Well, it means there is no existing `timeline_item_index`
855                // so we are inserting at the last non-local item position as a fallback.
856                let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
857                    self.items
858                        .iter_remotes_region()
859                        .rev()
860                        .find_map(|(timeline_item_index, timeline_item)| {
861                            timeline_item.as_event().map(|_| timeline_item_index + 1)
862                        })
863                        .unwrap_or_else(|| {
864                            // There is no remote timeline item, so we could insert at the start of
865                            // the remotes region.
866                            self.items.first_remotes_region_index()
867                        })
868                });
869
870                trace!(
871                    ?event_index,
872                    ?timeline_item_index,
873                    "Adding new remote timeline at specific event index"
874                );
875
876                self.items.insert(timeline_item_index, item, Some(event_index));
877            }
878
879            Flow::Remote {
880                position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
881            } => {
882                let item = Self::recycle_local_or_create_item(
883                    self.items,
884                    self.meta,
885                    item,
886                    event_id,
887                    txn_id.as_deref(),
888                );
889
890                // Let's find the latest remote event and insert after it
891                let timeline_item_index = self
892                    .items
893                    .iter_remotes_region()
894                    .rev()
895                    .find_map(|(timeline_item_index, timeline_item)| {
896                        timeline_item.as_event().map(|_| timeline_item_index + 1)
897                    })
898                    .unwrap_or_else(|| {
899                        // There is no remote timeline item, so we could insert at the start of
900                        // the remotes region.
901                        self.items.first_remotes_region_index()
902                    });
903
904                let event_index = self
905                    .items
906                    .all_remote_events()
907                    .last_index()
908                    // The last remote event is necessarily associated to this
909                    // timeline item, see the contract of this method. Let's fallback to a similar
910                    // value as `timeline_item_index` instead of panicking.
911                    .or_else(|| {
912                        error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
913
914                        Some(0)
915                    });
916
917                // Try to keep precise insertion semantics here, in this exact order:
918                //
919                // * _push back_ when the new item is inserted after all items (the assumption
920                // being that this is the hot path, because most of the time new events
921                // come from the sync),
922                // * _push front_ when the new item is inserted at index 0,
923                // * _insert_ otherwise.
924
925                if timeline_item_index == self.items.len() {
926                    trace!("Adding new remote timeline item at the back");
927                    self.items.push_back(item, event_index);
928                } else if timeline_item_index == 0 {
929                    trace!("Adding new remote timeline item at the front");
930                    self.items.push_front(item, event_index);
931                } else {
932                    trace!(
933                        timeline_item_index,
934                        "Adding new remote timeline item at specific index"
935                    );
936                    self.items.insert(timeline_item_index, item, event_index);
937                }
938            }
939
940            Flow::Remote {
941                event_id: decrypted_event_id,
942                position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
943                ..
944            } => {
945                trace!("Updating timeline item at position {idx}");
946
947                // Update all events that replied to this previously encrypted message.
948                Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
949
950                let internal_id = self.items[*idx].internal_id.clone();
951                self.items.replace(*idx, TimelineItem::new(item, internal_id));
952            }
953        }
954
955        // If we don't have a read marker item, look if we need to add one now.
956        if !self.meta.has_up_to_date_read_marker_item {
957            self.meta.update_read_marker(self.items);
958        }
959    }
960
961    /// Try to recycle a local timeline item for the same event, or create a new
962    /// timeline item for it.
963    ///
964    /// Note: this method doesn't take `&mut self` to avoid a borrow checker
965    /// conflict with `TimelineEventHandler::add_item`.
966    fn recycle_local_or_create_item(
967        items: &mut ObservableItemsTransaction<'_>,
968        meta: &mut TimelineMetadata,
969        mut new_item: EventTimelineItem,
970        event_id: &EventId,
971        transaction_id: Option<&TransactionId>,
972    ) -> Arc<TimelineItem> {
973        // Detect a local timeline item that matches `event_id` or `transaction_id`.
974        if let Some((local_timeline_item_index, local_timeline_item)) = items
975            // Iterate the locals region.
976            .iter_locals_region()
977            // Iterate from the end to the start.
978            .rev()
979            .find_map(|(nth, timeline_item)| {
980                let event_timeline_item = timeline_item.as_event()?;
981
982                if Some(event_id) == event_timeline_item.event_id()
983                    || (transaction_id.is_some()
984                        && transaction_id == event_timeline_item.transaction_id())
985                {
986                    // A duplicate local event timeline item has been found!
987                    Some((nth, event_timeline_item))
988                } else {
989                    // This local event timeline is not the one we are looking for. Continue our
990                    // search.
991                    None
992                }
993            })
994        {
995            trace!(
996                ?event_id,
997                ?transaction_id,
998                ?local_timeline_item_index,
999                "Removing local timeline item"
1000            );
1001
1002            transfer_details(&mut new_item, local_timeline_item);
1003
1004            // Remove the local timeline item.
1005            let recycled = items.remove(local_timeline_item_index);
1006            TimelineItem::new(new_item, recycled.internal_id.clone())
1007        } else {
1008            // We haven't found a matching local item to recycle; create a new item.
1009            meta.new_timeline_item(new_item)
1010        }
1011    }
1012
1013    /// After updating the timeline item `new_item` which id is
1014    /// `target_event_id`, update other items that are responses to this item.
1015    fn maybe_update_responses(
1016        meta: &mut TimelineMetadata,
1017        items: &mut ObservableItemsTransaction<'_>,
1018        target_event_id: &EventId,
1019        new_item: &EventTimelineItem,
1020    ) {
1021        let Some(replies) = meta.replies.get(target_event_id) else {
1022            trace!("item has no replies");
1023            return;
1024        };
1025
1026        for reply_id in replies {
1027            let Some(timeline_item_index) = items
1028                .get_remote_event_by_event_id(reply_id)
1029                .and_then(|meta| meta.timeline_item_index)
1030            else {
1031                warn!(%reply_id, "event not known as an item in the timeline");
1032                continue;
1033            };
1034
1035            let Some(item) = items.get(timeline_item_index) else {
1036                warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1037                continue;
1038            };
1039
1040            let Some(event_item) = item.as_event() else { continue };
1041            let Some(msglike) = event_item.content.as_msglike() else { continue };
1042            let Some(message) = msglike.as_message() else { continue };
1043            let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1044
1045            trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1046            let in_reply_to = InReplyToDetails {
1047                event_id: in_reply_to.event_id.clone(),
1048                event: TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(
1049                    new_item,
1050                ))),
1051            };
1052
1053            let new_reply_content = TimelineItemContent::MsgLike(
1054                msglike
1055                    .with_in_reply_to(in_reply_to)
1056                    .with_kind(MsgLikeKind::Message(message.clone())),
1057            );
1058            let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1059            items.replace(timeline_item_index, new_reply_item);
1060        }
1061    }
1062}
1063
1064/// Transfer `TimelineDetails` that weren't available on the original
1065/// item and have been fetched separately (only `reply_to` for
1066/// now) from `old_item` to `item`, given two items for an event
1067/// that was re-received.
1068///
1069/// `old_item` *should* always be a local timeline item usually, but it
1070/// can be a remote timeline item.
1071fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1072    let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1073        return;
1074    };
1075    let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1076        return;
1077    };
1078
1079    let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1080    let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1081
1082    if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1083        in_reply_to.event = old_in_reply_to.event.clone();
1084    }
1085}