Skip to main content

matrix_sdk_ui/timeline/
event_handler.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20    deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
21    send_queue::SendHandle,
22};
23use matrix_sdk_base::crypto::types::events::UtdCause;
24use ruma::{
25    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26    TransactionId,
27    events::{
28        AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29        AnySyncTimelineEvent, MessageLikeEventContent, MessageLikeEventType,
30        StateEventContentChange, StateEventType, SyncStateEvent,
31        beacon_info::BeaconInfoEventContent,
32        poll::unstable_start::{
33            NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
34        },
35        receipt::Receipt,
36        relation::Replacement,
37        room::message::{
38            Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
39        },
40    },
41    serde::Raw,
42};
43use tracing::{debug, error, field::debug, instrument, trace, warn};
44
45use super::{
46    BeaconInfo, EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails,
47    LiveLocationState, MsgLikeContent, MsgLikeKind, OtherState, ReactionStatus, Sticker,
48    ThreadSummary, TimelineDetails, TimelineItem, TimelineItemContent,
49    controller::{
50        Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
51        TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
52    },
53    date_dividers::DateDividerAdjuster,
54    event_item::{
55        AnyOtherStateEventContentChange, EventSendState, EventTimelineItemKind,
56        LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
57        TimelineEventItemId,
58    },
59    traits::RoomDataProvider,
60};
61use crate::{
62    timeline::{
63        TimelineUniqueId, controller::aggregations::PendingEdit, event_item::OtherMessageLike,
64    },
65    unable_to_decrypt_hook::UtdHookManager,
66};
67
68/// When adding an event, useful information related to the source of the event.
69pub(super) enum Flow {
70    /// The event was locally created.
71    Local {
72        /// The transaction id we've used in requests associated to this event.
73        txn_id: OwnedTransactionId,
74
75        /// A handle to manipulate this event.
76        send_handle: Option<SendHandle>,
77    },
78
79    /// The event has been received from a remote source (sync, pagination,
80    /// etc.). This can be a "remote echo".
81    Remote {
82        /// The event identifier as returned by the server.
83        event_id: OwnedEventId,
84        /// The transaction id we might have used, if we're the sender of the
85        /// event.
86        txn_id: Option<OwnedTransactionId>,
87        /// The raw serialized JSON event.
88        raw_event: Raw<AnySyncTimelineEvent>,
89        /// Where should this be added in the timeline.
90        position: TimelineItemPosition,
91        /// Information about the encryption for this event.
92        encryption_info: Option<Arc<EncryptionInfo>>,
93    },
94}
95
96impl Flow {
97    /// Returns the [`TimelineEventItemId`] associated to this future item.
98    pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
99        match self {
100            Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
101            Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
102        }
103    }
104
105    /// If the flow is remote, returns the associated full raw event.
106    pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
107        as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
108    }
109}
110
111pub(super) struct TimelineEventContext {
112    pub(super) sender: OwnedUserId,
113    pub(super) sender_profile: Option<Profile>,
114    /// If the keys used to decrypt this event were shared-on-invite as part of
115    /// an [MSC4268] key bundle, the user ID of the forwarder.
116    ///
117    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
118    pub(super) forwarder: Option<OwnedUserId>,
119    /// If the keys used to decrypt this event were shared-on-invite as part of
120    /// an [MSC4268] key bundle, the forwarder's profile.
121    ///
122    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
123    pub(super) forwarder_profile: Option<Profile>,
124    /// The event's `origin_server_ts` field (or creation time for local echo).
125    pub(super) timestamp: MilliSecondsSinceUnixEpoch,
126    pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
127    pub(super) is_highlighted: bool,
128    pub(super) flow: Flow,
129
130    /// If the event represents a new item, should it be added to the timeline?
131    ///
132    /// This controls whether a new timeline *may* be added. If the update kind
133    /// is about an update to an existing timeline item (redaction, edit,
134    /// reaction, etc.), it's always handled by default.
135    pub(super) should_add_new_items: bool,
136}
137
138/// Which kind of aggregation (i.e. modification of a related event) are we
139/// going to handle?
140#[derive(Clone, Debug)]
141pub(super) enum HandleAggregationKind {
142    /// Adding a reaction to the related event.
143    Reaction { key: String },
144
145    /// Redacting (removing) the related event.
146    Redaction,
147
148    /// Editing (replacing) the related event with another one.
149    Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
150
151    /// Responding to the related poll event.
152    PollResponse { answers: Vec<String> },
153
154    /// Editing a related poll event's description.
155    PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
156
157    /// Ending a related poll.
158    PollEnd,
159
160    /// A location update for a live location sharing session (MSC3489).
161    BeaconUpdate { location: BeaconInfo },
162
163    /// A stop event for a live location sharing session (MSC3489).
164    ///
165    /// Sent when the user stops sharing their location. Unlike [`BeaconUpdate`]
166    /// this does not carry a `relates_to` event ID; instead the target live
167    /// item is found by matching the sender.
168    BeaconStop { content: BeaconInfoEventContent },
169
170    /// A decline for an `m.rtc.notification` call.
171    CallDeclined,
172}
173
174impl HandleAggregationKind {
175    /// Returns a small string describing this aggregation, for debug purposes.
176    pub fn debug_string(&self) -> &'static str {
177        match self {
178            HandleAggregationKind::Reaction { .. } => "a reaction",
179            HandleAggregationKind::Redaction => "a redaction",
180            HandleAggregationKind::Edit { .. } => "an edit",
181            HandleAggregationKind::PollResponse { .. } => "a poll response",
182            HandleAggregationKind::PollEdit { .. } => "a poll edit",
183            HandleAggregationKind::PollEnd => "a poll end",
184            HandleAggregationKind::BeaconUpdate { .. } => "a beacon location update",
185            HandleAggregationKind::BeaconStop { .. } => "a beacon stop",
186            HandleAggregationKind::CallDeclined => "a call decline",
187        }
188    }
189}
190
191/// An action that we want to cause on the timeline.
192#[derive(Clone, Debug)]
193#[allow(clippy::large_enum_variant)]
194pub(super) enum TimelineAction {
195    /// Add a new timeline item.
196    ///
197    /// This enqueues adding a new item to the timeline (i.e. push to the items
198    /// array in its state). The item may be filtered out, and thus not
199    /// added later.
200    AddItem {
201        /// The content of the item we want to add.
202        content: TimelineItemContent,
203    },
204
205    /// Handle an aggregation to another event.
206    ///
207    /// The event the aggregation is related to might not be included in the
208    /// timeline, in which case it will be stashed somewhere, until we see
209    /// the related event.
210    HandleAggregation {
211        /// To which other event does this aggregation apply to?
212        related_event: OwnedEventId,
213        /// What kind of aggregation are we handling here?
214        kind: HandleAggregationKind,
215    },
216}
217
218impl TimelineAction {
219    /// Create a new [`TimelineEventKind::AddItem`].
220    fn add_item(content: TimelineItemContent) -> Self {
221        Self::AddItem { content }
222    }
223
224    /// Create a new [`TimelineAction`] from a given remote event.
225    ///
226    /// The return value may be `None` if the event was a redacted reaction.
227    #[allow(clippy::too_many_arguments)]
228    pub async fn from_event<P: RoomDataProvider>(
229        event: AnySyncTimelineEvent,
230        raw_event: &Raw<AnySyncTimelineEvent>,
231        room_data_provider: &P,
232        unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
233        in_reply_to: Option<InReplyToDetails>,
234        thread_root: Option<OwnedEventId>,
235        thread_summary: Option<ThreadSummary>,
236    ) -> Option<Self> {
237        let redaction_rules = room_data_provider.room_version_rules().redaction;
238
239        let redacted_message_or_none = |event_type: MessageLikeEventType| {
240            (event_type != MessageLikeEventType::Reaction)
241                .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
242        };
243
244        Some(match event {
245            AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
246                if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
247                    Self::HandleAggregation {
248                        related_event: redacts,
249                        kind: HandleAggregationKind::Redaction,
250                    }
251                } else {
252                    Self::add_item(redacted_message_or_none(ev.event_type())?)
253                }
254            }
255
256            AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
257                Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
258                    // An event which is still encrypted.
259                    if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
260                        unable_to_decrypt
261                    {
262                        let utd_cause = UtdCause::determine(
263                            raw_event,
264                            room_data_provider.crypto_context_info().await,
265                            &unable_to_decrypt_info,
266                        );
267
268                        // Let the hook know that we ran into an unable-to-decrypt that is added to
269                        // the timeline.
270                        if let Some(hook) = unable_to_decrypt_hook_manager {
271                            hook.on_utd(
272                                ev.event_id(),
273                                utd_cause,
274                                ev.origin_server_ts(),
275                                ev.sender(),
276                            )
277                            .await;
278                        }
279
280                        Self::add_item(TimelineItemContent::MsgLike(
281                            MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
282                                content, utd_cause,
283                            )),
284                        ))
285                    } else {
286                        // If we get here, it means that some part of the code has created a
287                        // `TimelineEvent` containing an `m.room.encrypted` event without
288                        // decrypting it. Possibly this means that encryption has not been
289                        // configured. We treat it the same as any other message-like event.
290                        Self::from_content(
291                            AnyMessageLikeEventContent::RoomEncrypted(content),
292                            in_reply_to,
293                            thread_root,
294                            thread_summary,
295                        )
296                    }
297                }
298
299                Some(content) => {
300                    Self::from_content(content, in_reply_to, thread_root, thread_summary)
301                }
302
303                None => Self::add_item(redacted_message_or_none(ev.event_type())?),
304            },
305
306            AnySyncTimelineEvent::State(ev) => match ev {
307                AnySyncStateEvent::RoomMember(ev) => match ev {
308                    SyncStateEvent::Original(ev) => {
309                        Self::add_item(TimelineItemContent::room_member(
310                            ev.state_key,
311                            StateEventContentChange::Original {
312                                content: ev.content,
313                                prev_content: ev.unsigned.prev_content,
314                            },
315                            ev.sender,
316                        ))
317                    }
318                    SyncStateEvent::Redacted(ev) => {
319                        Self::add_item(TimelineItemContent::room_member(
320                            ev.state_key,
321                            StateEventContentChange::Redacted(ev.content),
322                            ev.sender,
323                        ))
324                    }
325                },
326                AnySyncStateEvent::BeaconInfo(ev) => match ev {
327                    SyncStateEvent::Original(ev) => {
328                        // Check the `live` field directly, not `is_live()` which
329                        // considers timeout. We want to create a timeline item for any
330                        // beacon_info that was started as live, regardless of whether
331                        // the timeout has since expired.
332                        if ev.content.live {
333                            Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
334                                kind: MsgLikeKind::LiveLocation(LiveLocationState::new(ev.content)),
335                                reactions: Default::default(),
336                                thread_root: None,
337                                in_reply_to: None,
338                                thread_summary: None,
339                            }))
340                        } else {
341                            // A non-live beacon_info is a stop event: it should update the
342                            // existing live item from the same sender rather than creating a
343                            // new timeline item.
344                            Self::HandleAggregation {
345                                // There is no explicit relates_to on a beacon_info state event;
346                                // the target is identified by sender in handle_beacon_stop.
347                                related_event: ev.event_id,
348                                kind: HandleAggregationKind::BeaconStop { content: ev.content },
349                            }
350                        }
351                    }
352                    SyncStateEvent::Redacted(_) => {
353                        Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
354                    }
355                },
356                ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
357                    state_key: ev.state_key().to_owned(),
358                    content: AnyOtherStateEventContentChange::with_event_content(
359                        ev.content_change(),
360                    ),
361                })),
362            },
363        })
364    }
365
366    /// Create a new [`TimelineAction`] from a given event's content.
367    ///
368    /// This is applicable to both remote event (as this is called from
369    /// [`TimelineAction::from_event`]) or local events (for which we only have
370    /// the content).
371    ///
372    /// The return value may be `None` if handling the event (be it a new item
373    /// or an aggregation) is not supported for this event type.
374    pub(super) fn from_content(
375        content: AnyMessageLikeEventContent,
376        in_reply_to: Option<InReplyToDetails>,
377        thread_root: Option<OwnedEventId>,
378        thread_summary: Option<ThreadSummary>,
379    ) -> Self {
380        match content {
381            AnyMessageLikeEventContent::Reaction(c) => {
382                // This is a reaction to a message.
383                Self::HandleAggregation {
384                    related_event: c.relates_to.event_id.clone(),
385                    kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
386                }
387            }
388
389            AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
390                relates_to: Some(Relation::Replacement(re)),
391                ..
392            }) => Self::HandleAggregation {
393                related_event: re.event_id.clone(),
394                kind: HandleAggregationKind::Edit { replacement: re },
395            },
396
397            AnyMessageLikeEventContent::UnstablePollStart(
398                UnstablePollStartEventContent::Replacement(re),
399            ) => Self::HandleAggregation {
400                related_event: re.relates_to.event_id.clone(),
401                kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
402            },
403
404            AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
405                related_event: c.relates_to.event_id,
406                kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
407            },
408
409            AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
410                related_event: c.relates_to.event_id,
411                kind: HandleAggregationKind::PollEnd,
412            },
413
414            AnyMessageLikeEventContent::CallInvite(_) => {
415                Self::add_item(TimelineItemContent::CallInvite)
416            }
417
418            AnyMessageLikeEventContent::RtcNotification(c) => {
419                Self::add_item(TimelineItemContent::RtcNotification {
420                    call_intent: c.call_intent,
421                    declined_by: Vec::new(),
422                })
423            }
424
425            AnyMessageLikeEventContent::RtcDecline(c) => Self::HandleAggregation {
426                related_event: c.relates_to.event_id,
427                kind: HandleAggregationKind::CallDeclined,
428            },
429
430            AnyMessageLikeEventContent::Sticker(content) => {
431                Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
432                    kind: MsgLikeKind::Sticker(Sticker { content }),
433                    reactions: Default::default(),
434                    thread_root,
435                    in_reply_to,
436                    thread_summary,
437                }))
438            }
439
440            AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
441                c,
442            )) => {
443                let poll_state = PollState::new(c.poll_start, c.text);
444
445                Self::AddItem {
446                    content: TimelineItemContent::MsgLike(MsgLikeContent {
447                        kind: MsgLikeKind::Poll(poll_state),
448                        reactions: Default::default(),
449                        thread_root,
450                        in_reply_to,
451                        thread_summary,
452                    }),
453                }
454            }
455
456            AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
457                content: TimelineItemContent::message(
458                    msg.msgtype,
459                    msg.mentions,
460                    Default::default(),
461                    thread_root,
462                    in_reply_to,
463                    thread_summary,
464                ),
465            },
466
467            AnyMessageLikeEventContent::Beacon(content) => Self::HandleAggregation {
468                related_event: content.relates_to.event_id,
469                kind: HandleAggregationKind::BeaconUpdate {
470                    location: BeaconInfo {
471                        geo_uri: content.location.uri,
472                        ts: content.ts,
473                        description: content.location.description,
474                        encryption_info: None, // Filled in later from the event context.
475                    },
476                },
477            },
478
479            event => {
480                let other = OtherMessageLike { event_type: event.event_type() };
481
482                Self::AddItem {
483                    content: TimelineItemContent::MsgLike(MsgLikeContent {
484                        kind: MsgLikeKind::Other(other),
485                        reactions: Default::default(),
486                        thread_root,
487                        in_reply_to,
488                        thread_summary,
489                    }),
490                }
491            }
492        }
493    }
494
495    pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
496        let error = Arc::new(error);
497        match event {
498            FailedToParseEvent::State { event_type, state_key } => {
499                Self::add_item(TimelineItemContent::FailedToParseState {
500                    event_type,
501                    state_key,
502                    error,
503                })
504            }
505            FailedToParseEvent::MsgLike(event_type) => {
506                Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
507            }
508        }
509    }
510}
511
512#[derive(Debug)]
513pub(super) enum FailedToParseEvent {
514    MsgLike(MessageLikeEventType),
515    State { event_type: StateEventType, state_key: String },
516}
517
518/// The position at which to perform an update of the timeline with events.
519#[derive(Clone, Copy, Debug)]
520pub(super) enum TimelineItemPosition {
521    /// One or more items are prepended to the timeline (i.e. they're the
522    /// oldest).
523    Start {
524        /// The origin of the new item(s).
525        origin: RemoteEventOrigin,
526    },
527
528    /// One or more items are appended to the timeline (i.e. they're the most
529    /// recent).
530    End {
531        /// The origin of the new item(s).
532        origin: RemoteEventOrigin,
533    },
534
535    /// One item is inserted to the timeline.
536    At {
537        /// Where to insert the remote event.
538        event_index: usize,
539
540        /// The origin of the new item.
541        origin: RemoteEventOrigin,
542    },
543
544    /// A single item is updated.
545    ///
546    /// This can happen for instance after a UTD has been successfully
547    /// decrypted, or when it's been redacted at the source.
548    UpdateAt {
549        /// The index of the **timeline item**.
550        timeline_item_index: usize,
551    },
552}
553
554/// Whether an item was removed or not.
555pub(super) type RemovedItem = bool;
556
557/// Data necessary to update the timeline, given a single event to handle.
558///
559/// Bundles together a few things that are needed throughout the different
560/// stages of handling an event (figuring out whether it should update an
561/// existing timeline item, transforming that item or creating a new one,
562/// updating the reactive Vec).
563pub(super) struct TimelineEventHandler<'a, 'o> {
564    items: &'a mut ObservableItemsTransaction<'o>,
565    meta: &'a mut TimelineMetadata,
566    ctx: TimelineEventContext,
567}
568
569impl<'a, 'o> TimelineEventHandler<'a, 'o> {
570    pub(super) fn new<P: RoomDataProvider>(
571        state: &'a mut TimelineStateTransaction<'o, P>,
572        ctx: TimelineEventContext,
573    ) -> Self {
574        let TimelineStateTransaction { items, meta, .. } = state;
575        Self { items, meta, ctx }
576    }
577
578    /// Handle an event.
579    ///
580    /// Returns if an item was added to the timeline due to the new timeline
581    /// action. Items might not be added to the timeline for various reasons,
582    /// some common ones are if the item:
583    ///     - Contains an unsupported event type.
584    ///     - Is an edit or a redaction.
585    ///     - Contains a local echo turning into a remote echo.
586    ///     - Contains a message that is already in the timeline but was now
587    ///       decrypted.
588    ///
589    /// `raw_event` is only needed to determine the cause of any UTDs,
590    /// so if we know this is not a UTD it can be None.
591    #[instrument(skip_all, fields(txn_id, event_id, position))]
592    pub(super) async fn handle_event(
593        mut self,
594        date_divider_adjuster: &mut DateDividerAdjuster,
595        timeline_action: TimelineAction,
596        recycled_timeline_id: Option<TimelineUniqueId>,
597    ) -> bool {
598        let span = tracing::Span::current();
599
600        date_divider_adjuster.mark_used();
601
602        match &self.ctx.flow {
603            Flow::Local { txn_id, .. } => {
604                span.record("txn_id", debug(txn_id));
605                debug!("Handling local event");
606            }
607
608            Flow::Remote { event_id, txn_id, position, .. } => {
609                span.record("event_id", debug(event_id));
610                span.record("position", debug(position));
611                if let Some(txn_id) = txn_id {
612                    span.record("txn_id", debug(txn_id));
613                }
614                trace!("Handling remote event");
615            }
616        }
617
618        let mut added_item = false;
619
620        match timeline_action {
621            TimelineAction::AddItem { content } => {
622                if self.ctx.should_add_new_items {
623                    self.add_item(content, recycled_timeline_id);
624                    added_item = true;
625                }
626            }
627
628            TimelineAction::HandleAggregation { related_event, kind } => match kind {
629                HandleAggregationKind::Reaction { key } => {
630                    self.handle_reaction(related_event, key);
631                }
632                HandleAggregationKind::Redaction => {
633                    self.handle_redaction(related_event);
634                }
635                HandleAggregationKind::Edit { replacement } => {
636                    self.handle_edit(
637                        replacement.event_id.clone(),
638                        PendingEditKind::RoomMessage(replacement),
639                    );
640                }
641                HandleAggregationKind::PollResponse { answers } => {
642                    self.handle_poll_response(related_event, answers);
643                }
644                HandleAggregationKind::PollEdit { replacement } => {
645                    self.handle_edit(
646                        replacement.event_id.clone(),
647                        PendingEditKind::Poll(replacement),
648                    );
649                }
650                HandleAggregationKind::PollEnd => {
651                    self.handle_poll_end(related_event);
652                }
653                HandleAggregationKind::BeaconUpdate { mut location } => {
654                    // Propagate the encryption info from the event context into
655                    // the beacon location update so it can be inspected later
656                    // (e.g. for shield state computation).
657                    let encryption_info = as_variant!(
658                        &self.ctx.flow,
659                        Flow::Remote { encryption_info, .. } => encryption_info.clone()
660                    )
661                    .flatten();
662                    location.encryption_info = encryption_info;
663
664                    self.handle_beacon_update(related_event, location);
665                }
666                HandleAggregationKind::BeaconStop { content } => {
667                    self.handle_beacon_stop(content);
668                }
669                HandleAggregationKind::CallDeclined => {
670                    self.handle_call_declined(related_event);
671                }
672            },
673        }
674
675        added_item
676    }
677
678    #[instrument(skip(self, edit_kind))]
679    fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
680        let target = TimelineEventItemId::EventId(edited_event_id.clone());
681
682        let encryption_info =
683            as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
684        let aggregation = Aggregation::new(
685            self.ctx.flow.timeline_item_id(),
686            AggregationKind::Edit(PendingEdit {
687                kind: edit_kind,
688                edit_json: self.ctx.flow.raw_event().cloned(),
689                encryption_info,
690                bundled_item_owner: None,
691            }),
692        );
693
694        self.meta.aggregations.add(target.clone(), aggregation.clone());
695
696        if let Some(new_item) = find_item_and_apply_aggregation(
697            &self.meta.aggregations,
698            self.items,
699            &target,
700            aggregation,
701            &self.meta.room_version_rules,
702        ) {
703            // Update all events that replied to this message with the edited content.
704            Self::maybe_update_responses(
705                self.meta,
706                self.items,
707                &edited_event_id,
708                EmbeddedEvent::from_timeline_item(&new_item),
709            );
710        }
711    }
712
713    /// Apply a reaction to a *remote* event.
714    ///
715    /// Reactions to local events are applied in
716    /// [`crate::timeline::TimelineController::handle_local_echo`].
717    #[instrument(skip(self))]
718    fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
719        let target = TimelineEventItemId::EventId(relates_to);
720
721        // Add the aggregation to the manager.
722        let reaction_status = match &self.ctx.flow {
723            Flow::Local { send_handle, .. } => {
724                // This is a local echo for a reaction to a remote event.
725                ReactionStatus::LocalToRemote(send_handle.clone())
726            }
727            Flow::Remote { event_id, .. } => {
728                // This is the remote echo for a reaction to a remote event.
729                ReactionStatus::RemoteToRemote(event_id.clone())
730            }
731        };
732
733        let aggregation = Aggregation::new(
734            self.ctx.flow.timeline_item_id(),
735            AggregationKind::Reaction {
736                key: reaction_key,
737                sender: self.ctx.sender.clone(),
738                timestamp: self.ctx.timestamp,
739                reaction_status,
740            },
741        );
742
743        self.meta.aggregations.add(target.clone(), aggregation.clone());
744        find_item_and_apply_aggregation(
745            &self.meta.aggregations,
746            self.items,
747            &target,
748            aggregation,
749            &self.meta.room_version_rules,
750        );
751    }
752
753    fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
754        let target = TimelineEventItemId::EventId(poll_event_id);
755        let aggregation = Aggregation::new(
756            self.ctx.flow.timeline_item_id(),
757            AggregationKind::PollResponse {
758                sender: self.ctx.sender.clone(),
759                timestamp: self.ctx.timestamp,
760                answers,
761            },
762        );
763        self.meta.aggregations.add(target.clone(), aggregation.clone());
764        find_item_and_apply_aggregation(
765            &self.meta.aggregations,
766            self.items,
767            &target,
768            aggregation,
769            &self.meta.room_version_rules,
770        );
771    }
772
773    fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
774        let target = TimelineEventItemId::EventId(poll_event_id);
775        let aggregation = Aggregation::new(
776            self.ctx.flow.timeline_item_id(),
777            AggregationKind::PollEnd { end_date: self.ctx.timestamp },
778        );
779        self.meta.aggregations.add(target.clone(), aggregation.clone());
780        find_item_and_apply_aggregation(
781            &self.meta.aggregations,
782            self.items,
783            &target,
784            aggregation,
785            &self.meta.room_version_rules,
786        );
787    }
788
789    /// Handle a stop `beacon_info` state event by finding the existing live
790    /// `LiveLocation` timeline item from the same sender and updating it via
791    /// the aggregation system.
792    ///
793    /// The stop event's content must match the start item's content (except for
794    /// the `live` field) to ensure we apply the stop to the correct session.
795    #[instrument(skip(self, content))]
796    fn handle_beacon_stop(&mut self, content: BeaconInfoEventContent) {
797        let sender = &self.ctx.sender;
798
799        // Find the live start item by sender and matching content.
800        let target_event_id = super::algorithms::rfind_event_item(self.items, |item| {
801            item.sender() == sender
802                && item.content().as_live_location_state().is_some_and(|s| s.matches_stop(&content))
803        })
804        .and_then(|(_, event_item)| event_item.inner.event_id().map(ToOwned::to_owned));
805
806        let aggregation = Aggregation::new(
807            self.ctx.flow.timeline_item_id(),
808            AggregationKind::BeaconStop { content },
809        );
810
811        let Some(target_event_id) = target_event_id else {
812            // The live start item hasn't arrived yet (or the content doesn't match).
813            // Stash the stop so it can be applied when the matching start item arrives.
814            trace!(
815                "no matching live beacon_info item found for {sender}; \
816                 stashing stop event to apply when the start item arrives"
817            );
818            self.meta.aggregations.add_pending_beacon_stop(sender.clone(), aggregation);
819            return;
820        };
821
822        let target = TimelineEventItemId::EventId(target_event_id);
823        self.meta.aggregations.add(target.clone(), aggregation.clone());
824        find_item_and_apply_aggregation(
825            &self.meta.aggregations,
826            self.items,
827            &target,
828            aggregation,
829            &self.meta.room_version_rules,
830        );
831    }
832
833    /// Handle a location update from a beacon event aggregating onto the
834    /// related `beacon_info` state event's timeline item.
835    #[instrument(skip(self, location))]
836    fn handle_beacon_update(&mut self, beacon_info_event_id: OwnedEventId, location: BeaconInfo) {
837        let target = TimelineEventItemId::EventId(beacon_info_event_id);
838        let aggregation = Aggregation::new(
839            self.ctx.flow.timeline_item_id(),
840            AggregationKind::BeaconUpdate { location },
841        );
842        self.meta.aggregations.add(target.clone(), aggregation.clone());
843        find_item_and_apply_aggregation(
844            &self.meta.aggregations,
845            self.items,
846            &target,
847            aggregation,
848            &self.meta.room_version_rules,
849        );
850    }
851
852    /// Looks for the redacted event in all the timeline event items, and
853    /// redacts it.
854    ///
855    /// This assumes the redacted event was present in the timeline in the first
856    /// place; it will warn if the redacted event has not been found.
857    #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
858    fn handle_redaction(&mut self, redacted: OwnedEventId) {
859        // TODO: Apply local redaction of PollResponse and PollEnd events.
860        // https://github.com/matrix-org/matrix-rust-sdk/pull/2381#issuecomment-1689647825
861
862        // If it's an aggregation that's being redacted, handle it here.
863        if self.handle_aggregation_redaction(redacted.clone()) {
864            // When we have raw timeline items, we should not return here anymore, as we
865            // might need to redact the raw item as well.
866            return;
867        }
868
869        let target = TimelineEventItemId::EventId(redacted.clone());
870        let aggregation = Aggregation::new(
871            self.ctx.flow.timeline_item_id(),
872            AggregationKind::Redaction {
873                is_local: false, // We can only get here for remote echoes of redactions.
874            },
875        );
876        self.meta.aggregations.add(target.clone(), aggregation.clone());
877
878        find_item_and_apply_aggregation(
879            &self.meta.aggregations,
880            self.items,
881            &target,
882            aggregation,
883            &self.meta.room_version_rules,
884        );
885
886        // Even if the redacted event wasn't in the timeline, we can always update
887        // responses with a placeholder "redacted" embedded item.
888        let embedded_event = EmbeddedEvent {
889            content: TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
890            sender: self.ctx.sender.clone(),
891            sender_profile: TimelineDetails::from_initial_value(self.ctx.sender_profile.clone()),
892            timestamp: self.ctx.timestamp,
893            identifier: TimelineEventItemId::EventId(redacted.clone()),
894        };
895
896        Self::maybe_update_responses(self.meta, self.items, &redacted, embedded_event);
897    }
898
899    /// Attempts to redact an aggregation (e.g. a reaction, a poll response,
900    /// etc.).
901    ///
902    /// Returns true if it's succeeded.
903    #[instrument(skip_all, fields(redacts = ?aggregation_id))]
904    fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
905        let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
906
907        match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
908            Ok(val) => val,
909            // This wasn't a known aggregation that was redacted.
910            Err(err) => {
911                warn!("error while attempting to remove aggregation: {err}");
912                // It could find an aggregation but didn't properly unapply it.
913                true
914            }
915        }
916    }
917
918    /// Handle a call decline event by updating the related call notification
919    /// event and adding the new decliner to the list via the manager.
920    fn handle_call_declined(&mut self, notification_event_id: OwnedEventId) {
921        let target = TimelineEventItemId::EventId(notification_event_id);
922        let aggregation = Aggregation::new(
923            self.ctx.flow.timeline_item_id(),
924            AggregationKind::CallDeclined { sender: self.ctx.sender.clone() },
925        );
926        self.meta.aggregations.add(target.clone(), aggregation.clone());
927        find_item_and_apply_aggregation(
928            &self.meta.aggregations,
929            self.items,
930            &target,
931            aggregation,
932            &self.meta.room_version_rules,
933        );
934    }
935
936    /// Add a new event item in the timeline.
937    ///
938    /// # Safety
939    ///
940    /// This method is not marked as unsafe **but** it manipulates
941    /// [`ObservableItemsTransaction::all_remote_events`]. 2 rules **must** be
942    /// respected:
943    ///
944    /// 1. the remote event of the item being added **must** be present in
945    ///    `all_remote_events`,
946    /// 2. the lastly added or updated remote event must be associated to the
947    ///    timeline item being added here.
948    fn add_item(
949        &mut self,
950        content: TimelineItemContent,
951        recycled_timeline_id: Option<TimelineUniqueId>,
952    ) {
953        let sender = self.ctx.sender.to_owned();
954        let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
955
956        let forwarder = self.ctx.forwarder.to_owned();
957        let forwarder_profile = self
958            .ctx
959            .forwarder
960            .as_ref()
961            .map(|_| TimelineDetails::from_initial_value(self.ctx.forwarder_profile.clone()));
962
963        let timestamp = self.ctx.timestamp;
964
965        let kind: EventTimelineItemKind = match &self.ctx.flow {
966            Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
967                send_state: EventSendState::NotSentYet { progress: None },
968                transaction_id: txn_id.to_owned(),
969                send_handle: send_handle.clone(),
970            }
971            .into(),
972
973            Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
974                let origin = match *position {
975                    TimelineItemPosition::Start { origin }
976                    | TimelineItemPosition::End { origin }
977                    | TimelineItemPosition::At { origin, .. } => origin,
978
979                    // For updates, reuse the origin of the encrypted event.
980                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
981                        .as_event()
982                        .and_then(|ev| Some(ev.as_remote()?.origin))
983                        .unwrap_or_else(|| {
984                            error!("Tried to update a local event");
985                            RemoteEventOrigin::Unknown
986                        }),
987                };
988
989                RemoteEventTimelineItem {
990                    event_id: event_id.clone(),
991                    transaction_id: txn_id.clone(),
992                    read_receipts: self.ctx.read_receipts.clone(),
993                    is_own: self.ctx.sender == self.meta.own_user_id,
994                    is_highlighted: self.ctx.is_highlighted,
995                    encryption_info: encryption_info.clone(),
996                    original_json: Some(raw_event.clone()),
997                    latest_edit_json: None,
998                    origin,
999                }
1000                .into()
1001            }
1002        };
1003
1004        let is_room_encrypted = self.meta.is_room_encrypted;
1005
1006        let item = EventTimelineItem::new(
1007            sender,
1008            sender_profile,
1009            forwarder,
1010            forwarder_profile,
1011            timestamp,
1012            content,
1013            kind,
1014            is_room_encrypted,
1015        );
1016
1017        // Apply any pending or stashed aggregations.
1018        let mut cowed = Cow::Owned(item);
1019        if let Err(err) = self.meta.aggregations.apply_all(
1020            &self.ctx.flow.timeline_item_id(),
1021            &self.ctx.sender,
1022            &mut cowed,
1023            self.items,
1024            &self.meta.room_version_rules,
1025        ) {
1026            warn!("discarding aggregations: {err}");
1027        }
1028        let item = cowed.into_owned();
1029
1030        match &self.ctx.flow {
1031            Flow::Local { .. } => {
1032                trace!("Adding new local timeline item");
1033
1034                let item = self.meta.new_timeline_item_with_internal_id(item, recycled_timeline_id);
1035
1036                self.items.push_local(item);
1037            }
1038
1039            Flow::Remote {
1040                position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
1041            } => {
1042                let item = Self::recycle_local_or_create_item(
1043                    self.items,
1044                    self.meta,
1045                    item,
1046                    event_id,
1047                    txn_id.as_deref(),
1048                    recycled_timeline_id,
1049                );
1050
1051                trace!("Adding new remote timeline item at the start");
1052
1053                self.items.push_front(item, Some(0));
1054            }
1055
1056            Flow::Remote {
1057                position: TimelineItemPosition::At { event_index, .. },
1058                event_id,
1059                txn_id,
1060                ..
1061            } => {
1062                let item = Self::recycle_local_or_create_item(
1063                    self.items,
1064                    self.meta,
1065                    item,
1066                    event_id,
1067                    txn_id.as_deref(),
1068                    recycled_timeline_id,
1069                );
1070
1071                let all_remote_events = self.items.all_remote_events();
1072                let event_index = *event_index;
1073
1074                // Look for the closest `timeline_item_index` at the left of `event_index`.
1075                let timeline_item_index = all_remote_events
1076                    .range(0..=event_index)
1077                    .rev()
1078                    .find_map(|event_meta| event_meta.timeline_item_index)
1079                    // The new `timeline_item_index` is the previous + 1.
1080                    .map(|timeline_item_index| timeline_item_index + 1);
1081
1082                // No index? Look for the closest `timeline_item_index` at the right of
1083                // `event_index`.
1084                let timeline_item_index = timeline_item_index.or_else(|| {
1085                    all_remote_events
1086                        .range(event_index + 1..)
1087                        .find_map(|event_meta| event_meta.timeline_item_index)
1088                });
1089
1090                // Still no index? Well, it means there is no existing `timeline_item_index`
1091                // so we are inserting at the last non-local item position as a fallback.
1092                let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
1093                    self.items
1094                        .iter_remotes_region()
1095                        .rev()
1096                        .find_map(|(timeline_item_index, timeline_item)| {
1097                            timeline_item.as_event().map(|_| timeline_item_index + 1)
1098                        })
1099                        .unwrap_or_else(|| {
1100                            // There is no remote timeline item, so we could insert at the start of
1101                            // the remotes region.
1102                            self.items.first_remotes_region_index()
1103                        })
1104                });
1105
1106                trace!(
1107                    ?event_index,
1108                    ?timeline_item_index,
1109                    "Adding new remote timeline at specific event index"
1110                );
1111
1112                self.items.insert(timeline_item_index, item, Some(event_index));
1113            }
1114
1115            Flow::Remote {
1116                position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
1117            } => {
1118                let item = Self::recycle_local_or_create_item(
1119                    self.items,
1120                    self.meta,
1121                    item,
1122                    event_id,
1123                    txn_id.as_deref(),
1124                    recycled_timeline_id,
1125                );
1126
1127                // Let's find the latest remote event and insert after it
1128                let timeline_item_index = self
1129                    .items
1130                    .iter_remotes_region()
1131                    .rev()
1132                    .find_map(|(timeline_item_index, timeline_item)| {
1133                        timeline_item.as_event().map(|_| timeline_item_index + 1)
1134                    })
1135                    .unwrap_or_else(|| {
1136                        // There is no remote timeline item, so we could insert at the start of
1137                        // the remotes region.
1138                        self.items.first_remotes_region_index()
1139                    });
1140
1141                let event_index = self
1142                    .items
1143                    .all_remote_events()
1144                    .last_index()
1145                    // The last remote event is necessarily associated to this
1146                    // timeline item, see the contract of this method. Let's fallback to a similar
1147                    // value as `timeline_item_index` instead of panicking.
1148                    .or_else(|| {
1149                        error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
1150
1151                        Some(0)
1152                    });
1153
1154                // Try to keep precise insertion semantics here, in this exact order:
1155                //
1156                // * _push back_ when the new item is inserted after all items (the assumption
1157                // being that this is the hot path, because most of the time new events
1158                // come from the sync),
1159                // * _push front_ when the new item is inserted at index 0,
1160                // * _insert_ otherwise.
1161
1162                if timeline_item_index == self.items.len() {
1163                    trace!("Adding new remote timeline item at the back");
1164                    self.items.push_back(item, event_index);
1165                } else if timeline_item_index == 0 {
1166                    trace!("Adding new remote timeline item at the front");
1167                    self.items.push_front(item, event_index);
1168                } else {
1169                    trace!(
1170                        timeline_item_index,
1171                        "Adding new remote timeline item at specific index"
1172                    );
1173                    self.items.insert(timeline_item_index, item, event_index);
1174                }
1175            }
1176
1177            Flow::Remote {
1178                event_id: decrypted_event_id,
1179                position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
1180                ..
1181            } => {
1182                trace!("Updating timeline item at position {idx}");
1183
1184                // Update all events that replied to this previously encrypted message.
1185                Self::maybe_update_responses(
1186                    self.meta,
1187                    self.items,
1188                    decrypted_event_id,
1189                    EmbeddedEvent::from_timeline_item(&item),
1190                );
1191
1192                let internal_id = self.items[*idx].internal_id.clone();
1193                self.items.replace(*idx, TimelineItem::new(item, internal_id));
1194            }
1195        }
1196
1197        // If we don't have a read marker item, look if we need to add one now.
1198        if !self.meta.has_up_to_date_read_marker_item {
1199            self.meta.update_read_marker(self.items);
1200        }
1201    }
1202
1203    /// Try to recycle a local timeline item for the same event, or create a new
1204    /// timeline item for it.
1205    ///
1206    /// Note: this method doesn't take `&mut self` to avoid a borrow checker
1207    /// conflict with `TimelineEventHandler::add_item`.
1208    fn recycle_local_or_create_item(
1209        items: &mut ObservableItemsTransaction<'_>,
1210        meta: &mut TimelineMetadata,
1211        mut new_item: EventTimelineItem,
1212        event_id: &EventId,
1213        transaction_id: Option<&TransactionId>,
1214        recycled_timeline_id: Option<TimelineUniqueId>,
1215    ) -> Arc<TimelineItem> {
1216        // Detect a local timeline item that matches `event_id` or `transaction_id`.
1217        if let Some((local_timeline_item_index, local_timeline_item)) = items
1218            // Iterate the locals region.
1219            .iter_locals_region()
1220            // Iterate from the end to the start.
1221            .rev()
1222            .find_map(|(nth, timeline_item)| {
1223                let event_timeline_item = timeline_item.as_event()?;
1224
1225                if Some(event_id) == event_timeline_item.event_id()
1226                    || (transaction_id.is_some()
1227                        && transaction_id == event_timeline_item.transaction_id())
1228                {
1229                    // A duplicate local event timeline item has been found!
1230                    Some((nth, event_timeline_item))
1231                } else {
1232                    // This local event timeline is not the one we are looking for. Continue our
1233                    // search.
1234                    None
1235                }
1236            })
1237        {
1238            trace!(
1239                ?event_id,
1240                ?transaction_id,
1241                ?local_timeline_item_index,
1242                "Removing local timeline item"
1243            );
1244
1245            transfer_details(&mut new_item, local_timeline_item);
1246
1247            // Remove the local timeline item.
1248            let recycled = items.remove(local_timeline_item_index);
1249            TimelineItem::new(new_item, recycled.internal_id.clone())
1250        } else {
1251            // We haven't found a matching local item to recycle; create a new item.
1252            meta.new_timeline_item_with_internal_id(new_item, recycled_timeline_id)
1253        }
1254    }
1255
1256    /// After updating the timeline item `new_item` which id is
1257    /// `target_event_id`, update other items that are responses to this item.
1258    fn maybe_update_responses(
1259        meta: &mut TimelineMetadata,
1260        items: &mut ObservableItemsTransaction<'_>,
1261        target_event_id: &EventId,
1262        new_embedded_event: EmbeddedEvent,
1263    ) {
1264        let Some(replies) = meta.replies.get(target_event_id) else {
1265            trace!("item has no replies");
1266            return;
1267        };
1268
1269        for reply_id in replies {
1270            let Some(timeline_item_index) = items
1271                .get_remote_event_by_event_id(reply_id)
1272                .and_then(|meta| meta.timeline_item_index)
1273            else {
1274                warn!(%reply_id, "event not known as an item in the timeline");
1275                continue;
1276            };
1277
1278            let Some(item) = items.get(timeline_item_index) else {
1279                warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1280                continue;
1281            };
1282
1283            let Some(event_item) = item.as_event() else { continue };
1284            let Some(msglike) = event_item.content.as_msglike() else { continue };
1285            let Some(message) = msglike.as_message() else { continue };
1286            let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1287
1288            trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1289            let in_reply_to = InReplyToDetails {
1290                event_id: in_reply_to.event_id.clone(),
1291                event: TimelineDetails::Ready(Box::new(new_embedded_event.clone())),
1292            };
1293
1294            let new_reply_content = TimelineItemContent::MsgLike(
1295                msglike
1296                    .with_in_reply_to(in_reply_to)
1297                    .with_kind(MsgLikeKind::Message(message.clone())),
1298            );
1299            let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1300            items.replace(timeline_item_index, new_reply_item);
1301        }
1302    }
1303}
1304
1305/// Transfer `TimelineDetails` that weren't available on the original
1306/// item and have been fetched separately (only `reply_to` for
1307/// now) from `old_item` to `item`, given two items for an event
1308/// that was re-received.
1309///
1310/// `old_item` *should* always be a local timeline item usually, but it
1311/// can be a remote timeline item.
1312fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1313    let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1314        return;
1315    };
1316    let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1317        return;
1318    };
1319
1320    let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1321    let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1322
1323    if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1324        in_reply_to.event = old_in_reply_to.event.clone();
1325    }
1326}