matrix_sdk_ui/timeline/
event_handler.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20    crypto::types::events::UtdCause,
21    deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22    send_queue::SendHandle,
23};
24use ruma::{
25    events::{
26        poll::unstable_start::{
27            NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
28        },
29        receipt::Receipt,
30        relation::Replacement,
31        room::message::{
32            Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
33        },
34        AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
35        AnySyncTimelineEvent, EventContent, FullStateEventContent, MessageLikeEventType,
36        StateEventType, SyncStateEvent,
37    },
38    serde::Raw,
39    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
40    TransactionId,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45    controller::{
46        find_item_and_apply_aggregation, Aggregation, AggregationKind, ObservableItemsTransaction,
47        PendingEditKind, TimelineMetadata, TimelineStateTransaction,
48    },
49    date_dividers::DateDividerAdjuster,
50    event_item::{
51        AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
52        LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
53        TimelineEventItemId,
54    },
55    traits::RoomDataProvider,
56    EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
57    MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
58    TimelineItemContent,
59};
60use crate::timeline::controller::aggregations::PendingEdit;
61
62/// When adding an event, useful information related to the source of the event.
63pub(super) enum Flow {
64    /// The event was locally created.
65    Local {
66        /// The transaction id we've used in requests associated to this event.
67        txn_id: OwnedTransactionId,
68
69        /// A handle to manipulate this event.
70        send_handle: Option<SendHandle>,
71    },
72
73    /// The event has been received from a remote source (sync, pagination,
74    /// etc.). This can be a "remote echo".
75    Remote {
76        /// The event identifier as returned by the server.
77        event_id: OwnedEventId,
78        /// The transaction id we might have used, if we're the sender of the
79        /// event.
80        txn_id: Option<OwnedTransactionId>,
81        /// The raw serialized JSON event.
82        raw_event: Raw<AnySyncTimelineEvent>,
83        /// Where should this be added in the timeline.
84        position: TimelineItemPosition,
85        /// Information about the encryption for this event.
86        encryption_info: Option<Arc<EncryptionInfo>>,
87    },
88}
89
90impl Flow {
91    /// Returns the [`TimelineEventItemId`] associated to this future item.
92    pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
93        match self {
94            Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
95            Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
96        }
97    }
98
99    /// If the flow is remote, returns the associated full raw event.
100    pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
101        as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
102    }
103}
104
105pub(super) struct TimelineEventContext {
106    pub(super) sender: OwnedUserId,
107    pub(super) sender_profile: Option<Profile>,
108    /// The event's `origin_server_ts` field (or creation time for local echo).
109    pub(super) timestamp: MilliSecondsSinceUnixEpoch,
110    pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
111    pub(super) is_highlighted: bool,
112    pub(super) flow: Flow,
113
114    /// If the event represents a new item, should it be added to the timeline?
115    ///
116    /// This controls whether a new timeline *may* be added. If the update kind
117    /// is about an update to an existing timeline item (redaction, edit,
118    /// reaction, etc.), it's always handled by default.
119    pub(super) should_add_new_items: bool,
120}
121
122/// Which kind of aggregation (i.e. modification of a related event) are we
123/// going to handle?
124#[derive(Clone, Debug)]
125pub(super) enum HandleAggregationKind {
126    /// Adding a reaction to the related event.
127    Reaction { key: String },
128
129    /// Redacting (removing) the related event.
130    Redaction,
131
132    /// Editing (replacing) the related event with another one.
133    Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
134
135    /// Responding to the related poll event.
136    PollResponse { answers: Vec<String> },
137
138    /// Editing a related poll event's description.
139    PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
140
141    /// Ending a related poll.
142    PollEnd,
143}
144
145/// An action that we want to cause on the timeline.
146#[derive(Clone, Debug)]
147#[allow(clippy::large_enum_variant)]
148pub(super) enum TimelineAction {
149    /// Add a new timeline item.
150    ///
151    /// This enqueues adding a new item to the timeline (i.e. push to the items
152    /// array in its state). The item may be filtered out, and thus not
153    /// added later.
154    AddItem {
155        /// The content of the item we want to add.
156        content: TimelineItemContent,
157    },
158
159    /// Handle an aggregation to another event.
160    ///
161    /// The event the aggregation is related to might not be included in the
162    /// timeline, in which case it will be stashed somewhere, until we see
163    /// the related event.
164    HandleAggregation {
165        /// To which other event does this aggregation apply to?
166        related_event: OwnedEventId,
167        /// What kind of aggregation are we handling here?
168        kind: HandleAggregationKind,
169    },
170}
171
172impl TimelineAction {
173    /// Create a new [`TimelineEventKind::AddItem`].
174    fn add_item(content: TimelineItemContent) -> Self {
175        Self::AddItem { content }
176    }
177
178    /// Create a new [`TimelineAction`] from a given remote event.
179    ///
180    /// The return value may be `None` if handling the event (be it a new item
181    /// or an aggregation) is not supported for this event type.
182    #[allow(clippy::too_many_arguments)]
183    pub async fn from_event<P: RoomDataProvider>(
184        event: AnySyncTimelineEvent,
185        raw_event: &Raw<AnySyncTimelineEvent>,
186        room_data_provider: &P,
187        unable_to_decrypt_info: Option<UnableToDecryptInfo>,
188        meta: &TimelineMetadata,
189        in_reply_to: Option<InReplyToDetails>,
190        thread_root: Option<OwnedEventId>,
191        thread_summary: Option<ThreadSummary>,
192    ) -> Option<Self> {
193        let room_version = room_data_provider.room_version();
194
195        let redacted_message_or_none = |event_type: MessageLikeEventType| {
196            (event_type != MessageLikeEventType::Reaction)
197                .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
198        };
199
200        Some(match event {
201            AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
202                if let Some(redacts) = ev.redacts(&room_version).map(ToOwned::to_owned) {
203                    Self::HandleAggregation {
204                        related_event: redacts,
205                        kind: HandleAggregationKind::Redaction,
206                    }
207                } else {
208                    Self::add_item(redacted_message_or_none(ev.event_type())?)
209                }
210            }
211
212            AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
213                Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
214                    // An event which is still encrypted.
215                    if let Some(unable_to_decrypt_info) = unable_to_decrypt_info {
216                        let utd_cause = UtdCause::determine(
217                            raw_event,
218                            room_data_provider.crypto_context_info().await,
219                            &unable_to_decrypt_info,
220                        );
221
222                        // Let the hook know that we ran into an unable-to-decrypt that is added to
223                        // the timeline.
224                        if let Some(hook) = meta.unable_to_decrypt_hook.as_ref() {
225                            hook.on_utd(
226                                ev.event_id(),
227                                utd_cause,
228                                ev.origin_server_ts(),
229                                ev.sender(),
230                            )
231                            .await;
232                        }
233
234                        Self::add_item(TimelineItemContent::MsgLike(
235                            MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
236                                content, utd_cause,
237                            )),
238                        ))
239                    } else {
240                        // If we get here, it means that some part of the code has created a
241                        // `TimelineEvent` containing an `m.room.encrypted` event without
242                        // decrypting it. Possibly this means that encryption has not been
243                        // configured. We treat it the same as any other message-like event.
244                        return Self::from_content(
245                            AnyMessageLikeEventContent::RoomEncrypted(content),
246                            in_reply_to,
247                            thread_root,
248                            thread_summary,
249                        );
250                    }
251                }
252
253                Some(content) => {
254                    return Self::from_content(content, in_reply_to, thread_root, thread_summary);
255                }
256
257                None => Self::add_item(redacted_message_or_none(ev.event_type())?),
258            },
259
260            AnySyncTimelineEvent::State(ev) => match ev {
261                AnySyncStateEvent::RoomMember(ev) => match ev {
262                    SyncStateEvent::Original(ev) => {
263                        Self::add_item(TimelineItemContent::room_member(
264                            ev.state_key,
265                            FullStateEventContent::Original {
266                                content: ev.content,
267                                prev_content: ev.unsigned.prev_content,
268                            },
269                            ev.sender,
270                        ))
271                    }
272                    SyncStateEvent::Redacted(ev) => {
273                        Self::add_item(TimelineItemContent::room_member(
274                            ev.state_key,
275                            FullStateEventContent::Redacted(ev.content),
276                            ev.sender,
277                        ))
278                    }
279                },
280                ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
281                    state_key: ev.state_key().to_owned(),
282                    content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
283                })),
284            },
285        })
286    }
287
288    /// Create a new [`TimelineAction`] from a given event's content.
289    ///
290    /// This is applicable to both remote event (as this is called from
291    /// [`TimelineAction::from_event`]) or local events (for which we only have
292    /// the content).
293    ///
294    /// The return value may be `None` if handling the event (be it a new item
295    /// or an aggregation) is not supported for this event type.
296    pub(super) fn from_content(
297        content: AnyMessageLikeEventContent,
298        in_reply_to: Option<InReplyToDetails>,
299        thread_root: Option<OwnedEventId>,
300        thread_summary: Option<ThreadSummary>,
301    ) -> Option<Self> {
302        Some(match content {
303            AnyMessageLikeEventContent::Reaction(c) => {
304                // This is a reaction to a message.
305                Self::HandleAggregation {
306                    related_event: c.relates_to.event_id.clone(),
307                    kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
308                }
309            }
310
311            AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
312                relates_to: Some(Relation::Replacement(re)),
313                ..
314            }) => Self::HandleAggregation {
315                related_event: re.event_id.clone(),
316                kind: HandleAggregationKind::Edit { replacement: re },
317            },
318
319            AnyMessageLikeEventContent::UnstablePollStart(
320                UnstablePollStartEventContent::Replacement(re),
321            ) => Self::HandleAggregation {
322                related_event: re.relates_to.event_id.clone(),
323                kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
324            },
325
326            AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
327                related_event: c.relates_to.event_id,
328                kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
329            },
330
331            AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
332                related_event: c.relates_to.event_id,
333                kind: HandleAggregationKind::PollEnd,
334            },
335
336            AnyMessageLikeEventContent::CallInvite(_) => {
337                Self::add_item(TimelineItemContent::CallInvite)
338            }
339
340            AnyMessageLikeEventContent::CallNotify(_) => {
341                Self::add_item(TimelineItemContent::CallNotify)
342            }
343
344            AnyMessageLikeEventContent::Sticker(content) => {
345                Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
346                    kind: MsgLikeKind::Sticker(Sticker { content }),
347                    reactions: Default::default(),
348                    thread_root,
349                    in_reply_to,
350                    thread_summary,
351                }))
352            }
353
354            AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
355                c,
356            )) => {
357                let poll_state = PollState::new(c);
358
359                Self::AddItem {
360                    content: TimelineItemContent::MsgLike(MsgLikeContent {
361                        kind: MsgLikeKind::Poll(poll_state),
362                        reactions: Default::default(),
363                        thread_root,
364                        in_reply_to,
365                        thread_summary,
366                    }),
367                }
368            }
369
370            AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
371                content: TimelineItemContent::message(
372                    msg.msgtype,
373                    msg.mentions,
374                    Default::default(),
375                    thread_root,
376                    in_reply_to,
377                    thread_summary,
378                ),
379            },
380
381            _ => {
382                debug!(
383                    "Ignoring message-like event of type `{}`, not supported (yet)",
384                    content.event_type()
385                );
386                return None;
387            }
388        })
389    }
390
391    pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
392        let error = Arc::new(error);
393        match event {
394            FailedToParseEvent::State { event_type, state_key } => {
395                Self::add_item(TimelineItemContent::FailedToParseState {
396                    event_type,
397                    state_key,
398                    error,
399                })
400            }
401            FailedToParseEvent::MsgLike(event_type) => {
402                Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
403            }
404        }
405    }
406}
407
408#[derive(Debug)]
409pub(super) enum FailedToParseEvent {
410    MsgLike(MessageLikeEventType),
411    State { event_type: StateEventType, state_key: String },
412}
413
414/// The position at which to perform an update of the timeline with events.
415#[derive(Clone, Copy, Debug)]
416pub(super) enum TimelineItemPosition {
417    /// One or more items are prepended to the timeline (i.e. they're the
418    /// oldest).
419    Start {
420        /// The origin of the new item(s).
421        origin: RemoteEventOrigin,
422    },
423
424    /// One or more items are appended to the timeline (i.e. they're the most
425    /// recent).
426    End {
427        /// The origin of the new item(s).
428        origin: RemoteEventOrigin,
429    },
430
431    /// One item is inserted to the timeline.
432    At {
433        /// Where to insert the remote event.
434        event_index: usize,
435
436        /// The origin of the new item.
437        origin: RemoteEventOrigin,
438    },
439
440    /// A single item is updated.
441    ///
442    /// This can happen for instance after a UTD has been successfully
443    /// decrypted, or when it's been redacted at the source.
444    UpdateAt {
445        /// The index of the **timeline item**.
446        timeline_item_index: usize,
447    },
448}
449
450/// Whether an item was removed or not.
451pub(super) type RemovedItem = bool;
452
453/// Data necessary to update the timeline, given a single event to handle.
454///
455/// Bundles together a few things that are needed throughout the different
456/// stages of handling an event (figuring out whether it should update an
457/// existing timeline item, transforming that item or creating a new one,
458/// updating the reactive Vec).
459pub(super) struct TimelineEventHandler<'a, 'o> {
460    items: &'a mut ObservableItemsTransaction<'o>,
461    meta: &'a mut TimelineMetadata,
462    ctx: TimelineEventContext,
463}
464
465impl<'a, 'o> TimelineEventHandler<'a, 'o> {
466    pub(super) fn new(
467        state: &'a mut TimelineStateTransaction<'o>,
468        ctx: TimelineEventContext,
469    ) -> Self {
470        let TimelineStateTransaction { items, meta, .. } = state;
471        Self { items, meta, ctx }
472    }
473
474    /// Handle an event.
475    ///
476    /// Returns the number of timeline updates that were made.
477    ///
478    /// `raw_event` is only needed to determine the cause of any UTDs,
479    /// so if we know this is not a UTD it can be None.
480    #[instrument(skip_all, fields(txn_id, event_id, position))]
481    pub(super) async fn handle_event(
482        mut self,
483        date_divider_adjuster: &mut DateDividerAdjuster,
484        timeline_action: TimelineAction,
485    ) -> RemovedItem {
486        let span = tracing::Span::current();
487
488        date_divider_adjuster.mark_used();
489
490        match &self.ctx.flow {
491            Flow::Local { txn_id, .. } => {
492                span.record("txn_id", debug(txn_id));
493                debug!("Handling local event");
494            }
495
496            Flow::Remote { event_id, txn_id, position, .. } => {
497                span.record("event_id", debug(event_id));
498                span.record("position", debug(position));
499                if let Some(txn_id) = txn_id {
500                    span.record("txn_id", debug(txn_id));
501                }
502                trace!("Handling remote event");
503            }
504        };
505
506        let mut added_item = false;
507
508        match timeline_action {
509            TimelineAction::AddItem { content } => {
510                if self.ctx.should_add_new_items {
511                    self.add_item(content);
512                    added_item = true;
513                }
514            }
515
516            TimelineAction::HandleAggregation { related_event, kind } => match kind {
517                HandleAggregationKind::Reaction { key } => {
518                    self.handle_reaction(related_event, key);
519                }
520                HandleAggregationKind::Redaction => {
521                    self.handle_redaction(related_event);
522                }
523                HandleAggregationKind::Edit { replacement } => {
524                    self.handle_edit(
525                        replacement.event_id.clone(),
526                        PendingEditKind::RoomMessage(replacement),
527                    );
528                }
529                HandleAggregationKind::PollResponse { answers } => {
530                    self.handle_poll_response(related_event, answers);
531                }
532                HandleAggregationKind::PollEdit { replacement } => {
533                    self.handle_edit(
534                        replacement.event_id.clone(),
535                        PendingEditKind::Poll(replacement),
536                    );
537                }
538                HandleAggregationKind::PollEnd => {
539                    self.handle_poll_end(related_event);
540                }
541            },
542        }
543
544        let mut removed_item = false;
545
546        if !added_item {
547            trace!("No new item added");
548
549            if let Flow::Remote {
550                position: TimelineItemPosition::UpdateAt { timeline_item_index },
551                ..
552            } = self.ctx.flow
553            {
554                // If add was not called, that means the UTD event is one that
555                // wouldn't normally be visible. Remove it.
556                trace!("Removing UTD that was successfully retried");
557                self.items.remove(timeline_item_index);
558                removed_item = true;
559            }
560        }
561
562        removed_item
563    }
564
565    #[instrument(skip(self, edit_kind))]
566    fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
567        let target = TimelineEventItemId::EventId(edited_event_id.clone());
568
569        let encryption_info =
570            as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
571        let aggregation = Aggregation::new(
572            self.ctx.flow.timeline_item_id(),
573            AggregationKind::Edit(PendingEdit {
574                kind: edit_kind,
575                edit_json: self.ctx.flow.raw_event().cloned(),
576                encryption_info,
577            }),
578        );
579
580        self.meta.aggregations.add(target.clone(), aggregation.clone());
581
582        if let Some(new_item) = find_item_and_apply_aggregation(
583            &self.meta.aggregations,
584            self.items,
585            &target,
586            aggregation,
587            &self.meta.room_version,
588        ) {
589            // Update all events that replied to this message with the edited content.
590            Self::maybe_update_responses(self.meta, self.items, &edited_event_id, &new_item);
591        }
592    }
593
594    /// Apply a reaction to a *remote* event.
595    ///
596    /// Reactions to local events are applied in
597    /// [`crate::timeline::TimelineController::handle_local_echo`].
598    #[instrument(skip(self))]
599    fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
600        let target = TimelineEventItemId::EventId(relates_to);
601
602        // Add the aggregation to the manager.
603        let reaction_status = match &self.ctx.flow {
604            Flow::Local { send_handle, .. } => {
605                // This is a local echo for a reaction to a remote event.
606                ReactionStatus::LocalToRemote(send_handle.clone())
607            }
608            Flow::Remote { event_id, .. } => {
609                // This is the remote echo for a reaction to a remote event.
610                ReactionStatus::RemoteToRemote(event_id.clone())
611            }
612        };
613
614        let aggregation = Aggregation::new(
615            self.ctx.flow.timeline_item_id(),
616            AggregationKind::Reaction {
617                key: reaction_key,
618                sender: self.ctx.sender.clone(),
619                timestamp: self.ctx.timestamp,
620                reaction_status,
621            },
622        );
623
624        self.meta.aggregations.add(target.clone(), aggregation.clone());
625        find_item_and_apply_aggregation(
626            &self.meta.aggregations,
627            self.items,
628            &target,
629            aggregation,
630            &self.meta.room_version,
631        );
632    }
633
634    fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
635        let target = TimelineEventItemId::EventId(poll_event_id);
636        let aggregation = Aggregation::new(
637            self.ctx.flow.timeline_item_id(),
638            AggregationKind::PollResponse {
639                sender: self.ctx.sender.clone(),
640                timestamp: self.ctx.timestamp,
641                answers,
642            },
643        );
644        self.meta.aggregations.add(target.clone(), aggregation.clone());
645        find_item_and_apply_aggregation(
646            &self.meta.aggregations,
647            self.items,
648            &target,
649            aggregation,
650            &self.meta.room_version,
651        );
652    }
653
654    fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
655        let target = TimelineEventItemId::EventId(poll_event_id);
656        let aggregation = Aggregation::new(
657            self.ctx.flow.timeline_item_id(),
658            AggregationKind::PollEnd { end_date: self.ctx.timestamp },
659        );
660        self.meta.aggregations.add(target.clone(), aggregation.clone());
661        find_item_and_apply_aggregation(
662            &self.meta.aggregations,
663            self.items,
664            &target,
665            aggregation,
666            &self.meta.room_version,
667        );
668    }
669
670    /// Looks for the redacted event in all the timeline event items, and
671    /// redacts it.
672    ///
673    /// This assumes the redacted event was present in the timeline in the first
674    /// place; it will warn if the redacted event has not been found.
675    #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
676    fn handle_redaction(&mut self, redacted: OwnedEventId) {
677        // TODO: Apply local redaction of PollResponse and PollEnd events.
678        // https://github.com/matrix-org/matrix-rust-sdk/pull/2381#issuecomment-1689647825
679
680        // If it's an aggregation that's being redacted, handle it here.
681        if self.handle_aggregation_redaction(redacted.clone()) {
682            // When we have raw timeline items, we should not return here anymore, as we
683            // might need to redact the raw item as well.
684            return;
685        }
686
687        let target = TimelineEventItemId::EventId(redacted.clone());
688        let aggregation =
689            Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
690        self.meta.aggregations.add(target.clone(), aggregation.clone());
691
692        if let Some(new_item) = find_item_and_apply_aggregation(
693            &self.meta.aggregations,
694            self.items,
695            &target,
696            aggregation,
697            &self.meta.room_version,
698        ) {
699            // Look for any timeline event that's a reply to the redacted event, and redact
700            // the replied-to event there as well.
701            Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
702        }
703    }
704
705    /// Attempts to redact an aggregation (e.g. a reaction, a poll response,
706    /// etc.).
707    ///
708    /// Returns true if it's succeeded.
709    #[instrument(skip_all, fields(redacts = ?aggregation_id))]
710    fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
711        let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
712
713        match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
714            Ok(val) => val,
715            // This wasn't a known aggregation that was redacted.
716            Err(err) => {
717                warn!("error while attempting to remove aggregation: {err}");
718                // It could find an aggregation but didn't properly unapply it.
719                true
720            }
721        }
722    }
723
724    /// Add a new event item in the timeline.
725    ///
726    /// # Safety
727    ///
728    /// This method is not marked as unsafe **but** it manipulates
729    /// [`ObservableItemsTransaction::all_remote_events`]. 2 rules **must** be
730    /// respected:
731    ///
732    /// 1. the remote event of the item being added **must** be present in
733    ///    `all_remote_events`,
734    /// 2. the lastly added or updated remote event must be associated to the
735    ///    timeline item being added here.
736    fn add_item(&mut self, content: TimelineItemContent) {
737        let sender = self.ctx.sender.to_owned();
738        let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
739        let timestamp = self.ctx.timestamp;
740
741        let kind: EventTimelineItemKind = match &self.ctx.flow {
742            Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
743                send_state: EventSendState::NotSentYet,
744                transaction_id: txn_id.to_owned(),
745                send_handle: send_handle.clone(),
746            }
747            .into(),
748
749            Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
750                let origin = match *position {
751                    TimelineItemPosition::Start { origin }
752                    | TimelineItemPosition::End { origin }
753                    | TimelineItemPosition::At { origin, .. } => origin,
754
755                    // For updates, reuse the origin of the encrypted event.
756                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
757                        .as_event()
758                        .and_then(|ev| Some(ev.as_remote()?.origin))
759                        .unwrap_or_else(|| {
760                            error!("Tried to update a local event");
761                            RemoteEventOrigin::Unknown
762                        }),
763                };
764
765                RemoteEventTimelineItem {
766                    event_id: event_id.clone(),
767                    transaction_id: txn_id.clone(),
768                    read_receipts: self.ctx.read_receipts.clone(),
769                    is_own: self.ctx.sender == self.meta.own_user_id,
770                    is_highlighted: self.ctx.is_highlighted,
771                    encryption_info: encryption_info.clone(),
772                    original_json: Some(raw_event.clone()),
773                    latest_edit_json: None,
774                    origin,
775                }
776                .into()
777            }
778        };
779
780        let is_room_encrypted = self.meta.is_room_encrypted;
781
782        let item = EventTimelineItem::new(
783            sender,
784            sender_profile,
785            timestamp,
786            content,
787            kind,
788            is_room_encrypted,
789        );
790
791        // Apply any pending or stashed aggregations.
792        let mut cowed = Cow::Owned(item);
793        if let Err(err) = self.meta.aggregations.apply_all(
794            &self.ctx.flow.timeline_item_id(),
795            &mut cowed,
796            self.items,
797            &self.meta.room_version,
798        ) {
799            warn!("discarding aggregations: {err}");
800        }
801        let item = cowed.into_owned();
802
803        match &self.ctx.flow {
804            Flow::Local { .. } => {
805                trace!("Adding new local timeline item");
806
807                let item = self.meta.new_timeline_item(item);
808
809                self.items.push_local(item);
810            }
811
812            Flow::Remote {
813                position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
814            } => {
815                let item = Self::recycle_local_or_create_item(
816                    self.items,
817                    self.meta,
818                    item,
819                    event_id,
820                    txn_id.as_deref(),
821                );
822
823                trace!("Adding new remote timeline item at the start");
824
825                self.items.push_front(item, Some(0));
826            }
827
828            Flow::Remote {
829                position: TimelineItemPosition::At { event_index, .. },
830                event_id,
831                txn_id,
832                ..
833            } => {
834                let item = Self::recycle_local_or_create_item(
835                    self.items,
836                    self.meta,
837                    item,
838                    event_id,
839                    txn_id.as_deref(),
840                );
841
842                let all_remote_events = self.items.all_remote_events();
843                let event_index = *event_index;
844
845                // Look for the closest `timeline_item_index` at the left of `event_index`.
846                let timeline_item_index = all_remote_events
847                    .range(0..=event_index)
848                    .rev()
849                    .find_map(|event_meta| event_meta.timeline_item_index)
850                    // The new `timeline_item_index` is the previous + 1.
851                    .map(|timeline_item_index| timeline_item_index + 1);
852
853                // No index? Look for the closest `timeline_item_index` at the right of
854                // `event_index`.
855                let timeline_item_index = timeline_item_index.or_else(|| {
856                    all_remote_events
857                        .range(event_index + 1..)
858                        .find_map(|event_meta| event_meta.timeline_item_index)
859                });
860
861                // Still no index? Well, it means there is no existing `timeline_item_index`
862                // so we are inserting at the last non-local item position as a fallback.
863                let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
864                    self.items
865                        .iter_remotes_region()
866                        .rev()
867                        .find_map(|(timeline_item_index, timeline_item)| {
868                            timeline_item.as_event().map(|_| timeline_item_index + 1)
869                        })
870                        .unwrap_or_else(|| {
871                            // There is no remote timeline item, so we could insert at the start of
872                            // the remotes region.
873                            self.items.first_remotes_region_index()
874                        })
875                });
876
877                trace!(
878                    ?event_index,
879                    ?timeline_item_index,
880                    "Adding new remote timeline at specific event index"
881                );
882
883                self.items.insert(timeline_item_index, item, Some(event_index));
884            }
885
886            Flow::Remote {
887                position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
888            } => {
889                let item = Self::recycle_local_or_create_item(
890                    self.items,
891                    self.meta,
892                    item,
893                    event_id,
894                    txn_id.as_deref(),
895                );
896
897                // Let's find the latest remote event and insert after it
898                let timeline_item_index = self
899                    .items
900                    .iter_remotes_region()
901                    .rev()
902                    .find_map(|(timeline_item_index, timeline_item)| {
903                        timeline_item.as_event().map(|_| timeline_item_index + 1)
904                    })
905                    .unwrap_or_else(|| {
906                        // There is no remote timeline item, so we could insert at the start of
907                        // the remotes region.
908                        self.items.first_remotes_region_index()
909                    });
910
911                let event_index = self
912                    .items
913                    .all_remote_events()
914                    .last_index()
915                    // The last remote event is necessarily associated to this
916                    // timeline item, see the contract of this method. Let's fallback to a similar
917                    // value as `timeline_item_index` instead of panicking.
918                    .or_else(|| {
919                        error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
920
921                        Some(0)
922                    });
923
924                // Try to keep precise insertion semantics here, in this exact order:
925                //
926                // * _push back_ when the new item is inserted after all items (the assumption
927                // being that this is the hot path, because most of the time new events
928                // come from the sync),
929                // * _push front_ when the new item is inserted at index 0,
930                // * _insert_ otherwise.
931
932                if timeline_item_index == self.items.len() {
933                    trace!("Adding new remote timeline item at the back");
934                    self.items.push_back(item, event_index);
935                } else if timeline_item_index == 0 {
936                    trace!("Adding new remote timeline item at the front");
937                    self.items.push_front(item, event_index);
938                } else {
939                    trace!(
940                        timeline_item_index,
941                        "Adding new remote timeline item at specific index"
942                    );
943                    self.items.insert(timeline_item_index, item, event_index);
944                }
945            }
946
947            Flow::Remote {
948                event_id: decrypted_event_id,
949                position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
950                ..
951            } => {
952                trace!("Updating timeline item at position {idx}");
953
954                // Update all events that replied to this previously encrypted message.
955                Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
956
957                let internal_id = self.items[*idx].internal_id.clone();
958                self.items.replace(*idx, TimelineItem::new(item, internal_id));
959            }
960        }
961
962        // If we don't have a read marker item, look if we need to add one now.
963        if !self.meta.has_up_to_date_read_marker_item {
964            self.meta.update_read_marker(self.items);
965        }
966    }
967
968    /// Try to recycle a local timeline item for the same event, or create a new
969    /// timeline item for it.
970    ///
971    /// Note: this method doesn't take `&mut self` to avoid a borrow checker
972    /// conflict with `TimelineEventHandler::add_item`.
973    fn recycle_local_or_create_item(
974        items: &mut ObservableItemsTransaction<'_>,
975        meta: &mut TimelineMetadata,
976        mut new_item: EventTimelineItem,
977        event_id: &EventId,
978        transaction_id: Option<&TransactionId>,
979    ) -> Arc<TimelineItem> {
980        // Detect a local timeline item that matches `event_id` or `transaction_id`.
981        if let Some((local_timeline_item_index, local_timeline_item)) = items
982            // Iterate the locals region.
983            .iter_locals_region()
984            // Iterate from the end to the start.
985            .rev()
986            .find_map(|(nth, timeline_item)| {
987                let event_timeline_item = timeline_item.as_event()?;
988
989                if Some(event_id) == event_timeline_item.event_id()
990                    || (transaction_id.is_some()
991                        && transaction_id == event_timeline_item.transaction_id())
992                {
993                    // A duplicate local event timeline item has been found!
994                    Some((nth, event_timeline_item))
995                } else {
996                    // This local event timeline is not the one we are looking for. Continue our
997                    // search.
998                    None
999                }
1000            })
1001        {
1002            trace!(
1003                ?event_id,
1004                ?transaction_id,
1005                ?local_timeline_item_index,
1006                "Removing local timeline item"
1007            );
1008
1009            transfer_details(&mut new_item, local_timeline_item);
1010
1011            // Remove the local timeline item.
1012            let recycled = items.remove(local_timeline_item_index);
1013            TimelineItem::new(new_item, recycled.internal_id.clone())
1014        } else {
1015            // We haven't found a matching local item to recycle; create a new item.
1016            meta.new_timeline_item(new_item)
1017        }
1018    }
1019
1020    /// After updating the timeline item `new_item` which id is
1021    /// `target_event_id`, update other items that are responses to this item.
1022    fn maybe_update_responses(
1023        meta: &mut TimelineMetadata,
1024        items: &mut ObservableItemsTransaction<'_>,
1025        target_event_id: &EventId,
1026        new_item: &EventTimelineItem,
1027    ) {
1028        let Some(replies) = meta.replies.get(target_event_id) else {
1029            trace!("item has no replies");
1030            return;
1031        };
1032
1033        for reply_id in replies {
1034            let Some(timeline_item_index) = items
1035                .get_remote_event_by_event_id(reply_id)
1036                .and_then(|meta| meta.timeline_item_index)
1037            else {
1038                warn!(%reply_id, "event not known as an item in the timeline");
1039                continue;
1040            };
1041
1042            let Some(item) = items.get(timeline_item_index) else {
1043                warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1044                continue;
1045            };
1046
1047            let Some(event_item) = item.as_event() else { continue };
1048            let Some(msglike) = event_item.content.as_msglike() else { continue };
1049            let Some(message) = msglike.as_message() else { continue };
1050            let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1051
1052            trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1053            let in_reply_to = InReplyToDetails {
1054                event_id: in_reply_to.event_id.clone(),
1055                event: TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(
1056                    new_item,
1057                ))),
1058            };
1059
1060            let new_reply_content = TimelineItemContent::MsgLike(
1061                msglike
1062                    .with_in_reply_to(in_reply_to)
1063                    .with_kind(MsgLikeKind::Message(message.clone())),
1064            );
1065            let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1066            items.replace(timeline_item_index, new_reply_item);
1067        }
1068    }
1069}
1070
1071/// Transfer `TimelineDetails` that weren't available on the original
1072/// item and have been fetched separately (only `reply_to` for
1073/// now) from `old_item` to `item`, given two items for an event
1074/// that was re-received.
1075///
1076/// `old_item` *should* always be a local timeline item usually, but it
1077/// can be a remote timeline item.
1078fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1079    let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1080        return;
1081    };
1082    let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1083        return;
1084    };
1085
1086    let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1087    let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1088
1089    if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1090        in_reply_to.event = old_in_reply_to.event.clone();
1091    }
1092}