Skip to main content

matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
43use ruma::{
44    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
45    events::{
46        AnySyncTimelineEvent, beacon_info::BeaconInfoEventContent,
47        poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
48        relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
49    },
50    room_version_rules::RoomVersionRules,
51    serde::Raw,
52};
53use tracing::{error, info, trace, warn};
54
55use super::{ObservableItemsTransaction, rfind_event_by_item_id};
56use crate::timeline::{
57    BeaconInfo, EventTimelineItem, LiveLocationState, MsgLikeContent, MsgLikeKind, PollState,
58    ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
59    event_item::beacon_info_matches,
60};
61
62#[derive(Clone)]
63pub(in crate::timeline) enum PendingEditKind {
64    RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
65    Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
66}
67
68impl std::fmt::Debug for PendingEditKind {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
72            Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
73        }
74    }
75}
76
77#[derive(Clone, Debug)]
78pub(in crate::timeline) struct PendingEdit {
79    /// The kind of edit this is.
80    pub kind: PendingEditKind,
81
82    /// The raw JSON for the edit.
83    pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
84
85    /// The encryption info for this edit.
86    pub encryption_info: Option<Arc<EncryptionInfo>>,
87
88    /// If provided, this is the identifier of a remote event item that included
89    /// this bundled edit.
90    pub bundled_item_owner: Option<OwnedEventId>,
91}
92
93/// Which kind of aggregation (related event) is this?
94#[derive(Clone, Debug)]
95pub(crate) enum AggregationKind {
96    /// This is a response to a poll.
97    PollResponse {
98        /// Sender of the poll's response.
99        sender: OwnedUserId,
100        /// Timestamp at which the response has beens ent.
101        timestamp: MilliSecondsSinceUnixEpoch,
102        /// All the answers to the poll sent by the sender.
103        answers: Vec<String>,
104    },
105
106    /// This is the marker of the end of a poll.
107    PollEnd {
108        /// Timestamp at which the poll ends, i.e. all the responses with a
109        /// timestamp prior to this one should be taken into account
110        /// (and all the responses with a timestamp after this one
111        /// should be dropped).
112        end_date: MilliSecondsSinceUnixEpoch,
113    },
114
115    /// This is a reaction to another event.
116    Reaction {
117        /// The reaction "key" displayed by the client, often an emoji.
118        key: String,
119        /// Sender of the reaction.
120        sender: OwnedUserId,
121        /// Timestamp at which the reaction has been sent.
122        timestamp: MilliSecondsSinceUnixEpoch,
123        /// The send status of the reaction this is, with handles to abort it if
124        /// we can, etc.
125        reaction_status: ReactionStatus,
126    },
127
128    /// An event has been redacted.
129    Redaction {
130        /// Whether this aggregation results from the local echo of a redaction.
131        /// Local echoes of redactions are applied reversibly whereas remote
132        /// echoes of redactions are applied irreversibly.
133        is_local: bool,
134    },
135
136    /// An event has been edited.
137    ///
138    /// Note that edits can't be applied in isolation; we need to identify what
139    /// the *latest* edit is, based on the event ordering. As such, they're
140    /// handled exceptionally in `Aggregation::apply` and
141    /// `Aggregation::unapply`, and the callers have the responsibility of
142    /// considering all the edits and applying only the right one.
143    Edit(PendingEdit),
144
145    /// A location update for a live location sharing session (MSC3489).
146    BeaconUpdate { location: BeaconInfo },
147
148    /// A stop event for a live location sharing session (MSC3489).
149    ///
150    /// Carries the new (non-live) [`BeaconInfoEventContent`] that should
151    /// replace the stored content on the target item, flipping
152    /// [`LiveLocationState::is_live`] to `false`.
153    ///
154    /// Unlike [`BeaconUpdate`], a beacon stop is not reversible.
155    BeaconStop { content: BeaconInfoEventContent },
156
157    /// An m.rtc.decline event for an m.rtc.notification event
158    CallDeclined {
159        /// Sender of the decline.
160        sender: OwnedUserId,
161    },
162}
163
164/// An aggregation is an event related to another event (for instance a
165/// reaction, a poll's response, etc.).
166///
167/// It can be either a local or a remote echo.
168#[derive(Clone, Debug)]
169pub(crate) struct Aggregation {
170    /// The kind of aggregation this represents.
171    pub kind: AggregationKind,
172
173    /// The own timeline identifier for an aggregation.
174    ///
175    /// It will be a transaction id when the aggregation is still a local echo,
176    /// and it will transition into an event id when the aggregation is a
177    /// remote echo (i.e. has been received in a sync response):
178    pub own_id: TimelineEventItemId,
179}
180
181/// Get the poll state from a given [`TimelineItemContent`].
182fn poll_state_from_item<'a>(
183    event: &'a mut Cow<'_, EventTimelineItem>,
184) -> Result<&'a mut PollState, AggregationError> {
185    let content = event.to_mut().content_mut();
186
187    if let TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(state), .. }) =
188        content
189    {
190        Ok(state)
191    } else {
192        Err(AggregationError::InvalidType {
193            expected: "a poll".to_owned(),
194            actual: content.debug_string().to_owned(),
195        })
196    }
197}
198
199/// Get the [`LiveLocationState`] from a given [`TimelineItemContent`], mutably.
200fn live_location_state_from_item<'a>(
201    event: &'a mut Cow<'_, EventTimelineItem>,
202) -> Result<&'a mut LiveLocationState, AggregationError> {
203    let content = event.to_mut().content_mut();
204
205    if let TimelineItemContent::MsgLike(MsgLikeContent {
206        kind: MsgLikeKind::LiveLocation(state),
207        ..
208    }) = content
209    {
210        Ok(state)
211    } else {
212        Err(AggregationError::InvalidType {
213            expected: "a live location".to_owned(),
214            actual: content.debug_string().to_owned(),
215        })
216    }
217}
218
219/// Gets the mutable list of users that did decline this notification event.
220fn rtc_notification_declinations_from_item<'a>(
221    event: &'a mut Cow<'_, EventTimelineItem>,
222) -> Result<&'a mut Vec<OwnedUserId>, AggregationError> {
223    let content = event.to_mut().content_mut();
224
225    if let TimelineItemContent::RtcNotification { declined_by, .. } = content {
226        Ok(declined_by)
227    } else {
228        Err(AggregationError::InvalidType {
229            expected: "an rtc notification".to_owned(),
230            actual: content.debug_string().to_owned(),
231        })
232    }
233}
234
235impl Aggregation {
236    /// Create a new [`Aggregation`].
237    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
238        Self { kind, own_id }
239    }
240
241    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
242    ///
243    /// In case of success, returns an enum indicating whether the applied
244    /// aggregation had an effect on the content; if it updated it, then the
245    /// caller has the responsibility to reflect that change.
246    ///
247    /// In case of error, returns an error detailing why the aggregation
248    /// couldn't be applied.
249    fn apply(
250        &self,
251        event: &mut Cow<'_, EventTimelineItem>,
252        rules: &RoomVersionRules,
253    ) -> ApplyAggregationResult {
254        match &self.kind {
255            AggregationKind::PollResponse { sender, timestamp, answers } => {
256                match poll_state_from_item(event) {
257                    Ok(state) => {
258                        state.add_response(sender.clone(), *timestamp, answers.clone());
259                        ApplyAggregationResult::UpdatedItem
260                    }
261                    Err(err) => ApplyAggregationResult::Error(err),
262                }
263            }
264
265            AggregationKind::Redaction { is_local } => {
266                let is_local_redacted =
267                    event.content().is_redacted() && event.unredacted_item.is_some();
268                let is_remote_redacted =
269                    event.content().is_redacted() && event.unredacted_item.is_none();
270                if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
271                    ApplyAggregationResult::LeftItemIntact
272                } else {
273                    let new_item = event.redact(&rules.redaction, *is_local);
274                    *event = Cow::Owned(new_item);
275                    ApplyAggregationResult::UpdatedItem
276                }
277            }
278
279            AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
280                Ok(state) => {
281                    if !state.end(*end_date) {
282                        return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
283                    }
284                    ApplyAggregationResult::UpdatedItem
285                }
286                Err(err) => ApplyAggregationResult::Error(err),
287            },
288
289            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
290                let Some(reactions) = event.content().reactions() else {
291                    // An item that can't hold any reactions.
292                    return ApplyAggregationResult::LeftItemIntact;
293                };
294
295                let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
296
297                // If the reaction was already added to the item, we don't need to add it back.
298                //
299                // Search for a previous reaction that would be equivalent.
300
301                let is_same = previous_reaction.is_some_and(|prev| {
302                    prev.timestamp == *timestamp
303                        && matches!(
304                            (&prev.status, reaction_status),
305                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
306                                | (
307                                    ReactionStatus::LocalToRemote(_),
308                                    ReactionStatus::LocalToRemote(_),
309                                )
310                                | (
311                                    ReactionStatus::RemoteToRemote(_),
312                                    ReactionStatus::RemoteToRemote(_),
313                                )
314                        )
315                });
316
317                if is_same {
318                    ApplyAggregationResult::LeftItemIntact
319                } else {
320                    let reactions = event
321                        .to_mut()
322                        .content_mut()
323                        .reactions_mut()
324                        .expect("reactions was Some above");
325
326                    reactions.entry(key.clone()).or_default().insert(
327                        sender.clone(),
328                        ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
329                    );
330
331                    ApplyAggregationResult::UpdatedItem
332                }
333            }
334
335            AggregationKind::Edit(_) => {
336                // Let the caller handle the edit.
337                ApplyAggregationResult::Edit
338            }
339
340            AggregationKind::BeaconUpdate { location } => {
341                match live_location_state_from_item(event) {
342                    Ok(state) => {
343                        state.add_location(location.clone());
344                        ApplyAggregationResult::UpdatedItem
345                    }
346                    Err(err) => ApplyAggregationResult::Error(err),
347                }
348            }
349
350            AggregationKind::BeaconStop { content } => match live_location_state_from_item(event) {
351                Ok(state) => {
352                    state.stop(content.clone());
353                    ApplyAggregationResult::UpdatedItem
354                }
355                Err(err) => ApplyAggregationResult::Error(err),
356            },
357
358            AggregationKind::CallDeclined { sender } => {
359                match rtc_notification_declinations_from_item(event) {
360                    Ok(declinations) => {
361                        if declinations.contains(sender) {
362                            ApplyAggregationResult::LeftItemIntact
363                        } else {
364                            declinations.push(sender.clone());
365                            ApplyAggregationResult::UpdatedItem
366                        }
367                    }
368                    Err(err) => ApplyAggregationResult::Error(err),
369                }
370            }
371        }
372    }
373
374    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
375    ///
376    /// In case of success, returns an enum indicating whether unapplying the
377    /// aggregation had an effect on the content; if it updated it, then the
378    /// caller has the responsibility to reflect that change.
379    ///
380    /// In case of error, returns an error detailing why the aggregation
381    /// couldn't be unapplied.
382    fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
383        match &self.kind {
384            AggregationKind::PollResponse { sender, timestamp, .. } => {
385                let state = match poll_state_from_item(event) {
386                    Ok(state) => state,
387                    Err(err) => return ApplyAggregationResult::Error(err),
388                };
389                state.remove_response(sender, *timestamp);
390                ApplyAggregationResult::UpdatedItem
391            }
392
393            AggregationKind::PollEnd { .. } => {
394                // Assume we can't undo a poll end event at the moment.
395                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
396            }
397
398            AggregationKind::Redaction { is_local } => {
399                if *is_local {
400                    if event.unredacted_item.is_some() {
401                        // Unapply local redaction.
402                        *event = Cow::Owned(event.unredact());
403                        ApplyAggregationResult::UpdatedItem
404                    } else {
405                        // Event isn't locally redacted. Nothing to do.
406                        ApplyAggregationResult::LeftItemIntact
407                    }
408                } else {
409                    // Remote redactions are not reversible.
410                    ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
411                }
412            }
413
414            AggregationKind::Reaction { key, sender, .. } => {
415                let Some(reactions) = event.content().reactions() else {
416                    // An item that can't hold any reactions.
417                    return ApplyAggregationResult::LeftItemIntact;
418                };
419
420                // We only need to remove the previous reaction if it was there.
421                //
422                // Search for it.
423
424                let had_entry =
425                    reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
426
427                if had_entry {
428                    let reactions = event
429                        .to_mut()
430                        .content_mut()
431                        .reactions_mut()
432                        .expect("reactions was some above");
433                    let by_user = reactions.get_mut(key);
434                    if let Some(by_user) = by_user {
435                        by_user.swap_remove(sender);
436                        // If this was the last reaction, remove the entire map for this key.
437                        if by_user.is_empty() {
438                            reactions.swap_remove(key);
439                        }
440                    }
441                    ApplyAggregationResult::UpdatedItem
442                } else {
443                    ApplyAggregationResult::LeftItemIntact
444                }
445            }
446
447            AggregationKind::Edit(_) => {
448                // Let the caller handle the edit.
449                ApplyAggregationResult::Edit
450            }
451
452            AggregationKind::BeaconUpdate { location } => {
453                match live_location_state_from_item(event) {
454                    Ok(state) => {
455                        state.remove_location(location.ts);
456                        ApplyAggregationResult::UpdatedItem
457                    }
458                    Err(err) => ApplyAggregationResult::Error(err),
459                }
460            }
461
462            AggregationKind::BeaconStop { .. } => {
463                // Stopping a live location share is not reversible.
464                ApplyAggregationResult::Error(AggregationError::CantUndoBeaconStop)
465            }
466
467            AggregationKind::CallDeclined { .. } => {
468                // One cannot un-decline a call
469                ApplyAggregationResult::Error(AggregationError::CantUndoRtcDecline)
470            }
471        }
472    }
473}
474
475/// Manager for all known existing aggregations to all events in the timeline.
476#[derive(Clone, Debug, Default)]
477pub(crate) struct Aggregations {
478    /// Mapping of a target event to its list of aggregations.
479    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
480
481    /// Mapping of a related event identifier to its target.
482    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
483
484    /// A pending beacon-stop aggregation received before the corresponding live
485    /// `beacon_info` start item has arrived.
486    ///
487    /// Keyed by the sender's user ID. When a live start item is eventually
488    /// inserted via `add_item`, we check if the pending stop matches and
489    /// promote it into [`Self::related_events`] so that [`Self::apply_all`]
490    /// can apply it immediately.
491    pending_beacon_stops: HashMap<OwnedUserId, Aggregation>,
492}
493
494impl Aggregations {
495    /// Clear all the known aggregations from all the mappings.
496    pub fn clear(&mut self) {
497        self.related_events.clear();
498        self.inverted_map.clear();
499        self.pending_beacon_stops.clear();
500    }
501
502    /// Stash a [`AggregationKind::BeaconStop`] that arrived before its target
503    /// live `beacon_info` item. It will be promoted into
504    /// [`Self::related_events`] (and thus picked up by [`Self::apply_all`])
505    /// when the live item is inserted via
506    /// [`Self::promote_pending_beacon_stop`].
507    pub fn add_pending_beacon_stop(&mut self, sender: OwnedUserId, aggregation: Aggregation) {
508        self.pending_beacon_stops.insert(sender, aggregation);
509    }
510
511    /// Promote a matching stashed beacon-stop aggregation for `sender` into the
512    /// regular aggregation map, now that the live start item's
513    /// `target_event_id` is known.
514    ///
515    /// The pending stop's content must match the start event's content (except
516    /// for the `live` field) for promotion to occur. If they don't match, the
517    /// pending stop is discarded because it belongs to a different session.
518    ///
519    /// Should be called from `add_item` just before `apply_all`, when inserting
520    /// a live `beacon_info` item.
521    fn promote_pending_beacon_stop(
522        &mut self,
523        sender: &OwnedUserId,
524        target_event_id: OwnedEventId,
525        start_content: &BeaconInfoEventContent,
526    ) {
527        if !start_content.live {
528            return;
529        }
530
531        let Some(stop) = self.pending_beacon_stops.remove(sender) else { return };
532
533        let AggregationKind::BeaconStop { content: stop_content } = &stop.kind else {
534            warn!("pending beacon stop has unexpected aggregation kind");
535            return;
536        };
537
538        if !beacon_info_matches(start_content, stop_content) {
539            trace!("discarding stale pending beacon stop (content mismatch)");
540            return;
541        }
542
543        let target = TimelineEventItemId::EventId(target_event_id);
544        self.add(target, stop);
545    }
546
547    /// Add a given aggregation that relates to the [`TimelineItemContent`]
548    /// identified by the given [`TimelineEventItemId`].
549    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
550        // If the aggregation is a redaction, it invalidates all the other aggregations;
551        // remove them.
552        if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
553            for agg in self.related_events.remove(&related_to).unwrap_or_default() {
554                self.inverted_map.remove(&agg.own_id);
555            }
556        }
557
558        // If there was any redaction among the current aggregation, adding a new one
559        // should be a noop.
560        if let Some(previous_aggregations) = self.related_events.get(&related_to)
561            && previous_aggregations
562                .iter()
563                .any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
564        {
565            return;
566        }
567
568        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
569
570        // We can have 3 different states for the same aggregation in related_events, in
571        // chronological order:
572        //
573        // 1. The local echo with a transaction ID.
574        // 2. The local echo with the event ID returned by the server after sending the
575        //    event.
576        // 3. The remote echo received via sync.
577        //
578        // The transition from states 1 to 2 is handled in `mark_aggregation_as_sent()`.
579        // So here we need to handle the transition from states 2 to 3. We need to
580        // replace the local echo by the remote echo, which might have more data, like
581        // the raw JSON.
582        let related_events = self.related_events.entry(related_to).or_default();
583        if let Some(pos) = related_events.iter().position(|agg| agg.own_id == aggregation.own_id) {
584            related_events.remove(pos);
585        }
586        related_events.push(aggregation);
587    }
588
589    /// Is the given id one for a known aggregation to another event?
590    ///
591    /// If so, unapplies it by replacing the corresponding related item, if
592    /// needs be.
593    ///
594    /// Returns true if an aggregation was found. This doesn't mean
595    /// the underlying item has been updated, if it was missing from the
596    /// timeline for instance.
597    ///
598    /// May return an error if it found an aggregation, but it couldn't be
599    /// properly applied.
600    pub fn try_remove_aggregation(
601        &mut self,
602        aggregation_id: &TimelineEventItemId,
603        items: &mut ObservableItemsTransaction<'_>,
604    ) -> Result<bool, AggregationError> {
605        let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
606
607        // Find and remove the aggregation in the other mapping.
608        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
609            let removed = aggregations
610                .iter()
611                .position(|agg| agg.own_id == *aggregation_id)
612                .map(|idx| aggregations.remove(idx));
613
614            // If this was the last aggregation, remove the entry in the `related_events`
615            // mapping.
616            if aggregations.is_empty() {
617                self.related_events.remove(found);
618            }
619
620            removed
621        } else {
622            None
623        };
624
625        let Some(aggregation) = aggregation else {
626            warn!(
627                "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
628                 not in related-to map."
629            );
630            return Ok(false);
631        };
632
633        if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
634            let mut cowed = Cow::Borrowed(&*item);
635            match aggregation.unapply(&mut cowed) {
636                ApplyAggregationResult::UpdatedItem => {
637                    trace!("removed aggregation");
638                    items.replace(
639                        item_pos,
640                        TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
641                    );
642                }
643                ApplyAggregationResult::LeftItemIntact => {}
644                ApplyAggregationResult::Error(err) => {
645                    warn!("error when unapplying aggregation: {err}");
646                }
647                ApplyAggregationResult::Edit => {
648                    // This edit has been removed; try to find another that still applies.
649                    if let Some(aggregations) = self.related_events.get(found) {
650                        if resolve_edits(aggregations, items, &mut cowed) {
651                            items.replace(
652                                item_pos,
653                                TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
654                            );
655                        } else {
656                            // No other edit was found, leave the item as is.
657                            // TODO likely need to change the item to indicate
658                            // it's been un-edited etc.
659                        }
660                    } else {
661                        // No other edits apply.
662                    }
663                }
664            }
665        } else {
666            info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
667        }
668
669        Ok(true)
670    }
671
672    /// Apply all the aggregations to a [`TimelineItemContent`].
673    ///
674    /// If `sender` is provided alongside a remote `item_id`, any
675    /// [`AggregationKind::BeaconStop`] events that arrived out-of-order (i.e.
676    /// before the live `beacon_info` start item) are first promoted from the
677    /// pending-stops stash into the regular aggregation map so they are picked
678    /// up here together with every other pending aggregation for this item.
679    ///
680    /// Will return an error at the first aggregation that couldn't be applied;
681    /// see [`Aggregation::apply`] which explains under which conditions it can
682    /// happen.
683    pub fn apply_all(
684        &mut self,
685        item_id: &TimelineEventItemId,
686        sender: &OwnedUserId,
687        event: &mut Cow<'_, EventTimelineItem>,
688        items: &mut ObservableItemsTransaction<'_>,
689        rules: &RoomVersionRules,
690    ) -> Result<(), AggregationError> {
691        // If a beacon-stop arrived before this live start item, it was stashed
692        // in `pending_beacon_stops` keyed by sender. Promote it into
693        // `related_events` under the now-known start event ID so the loop below
694        // applies it together with any other pending aggregations.
695        //
696        // The promotion verifies that the pending stop's content matches the
697        // start event's content to ensure we don't apply an old stop to a new
698        // session.
699        if let TimelineEventItemId::EventId(event_id) = item_id
700            && let Some(live_location) = event.content().as_live_location_state()
701        {
702            self.promote_pending_beacon_stop(sender, event_id.clone(), &live_location.beacon_info);
703        }
704
705        let Some(aggregations) = self.related_events.get(item_id) else {
706            return Ok(());
707        };
708
709        let mut has_edits = false;
710
711        for a in aggregations {
712            match a.apply(event, rules) {
713                ApplyAggregationResult::Edit => {
714                    has_edits = true;
715                }
716                ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
717                ApplyAggregationResult::Error(err) => return Err(err),
718            }
719        }
720
721        if has_edits {
722            resolve_edits(aggregations, items, event);
723        }
724
725        Ok(())
726    }
727
728    /// Mark a target event as being sent (i.e. it transitions from an local
729    /// transaction id to its remote event id counterpart), by updating the
730    /// internal mappings.
731    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
732        let from = TimelineEventItemId::TransactionId(txn_id);
733        let to = TimelineEventItemId::EventId(event_id);
734
735        // Update the aggregations in the `related_events` field.
736        if let Some(aggregations) = self.related_events.remove(&from) {
737            // Update the inverted mappings (from aggregation's id, to the new target id).
738            for a in &aggregations {
739                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
740                    debug_assert_eq!(prev_target, from);
741                    self.inverted_map.insert(a.own_id.clone(), to.clone());
742                }
743            }
744            // Update the direct mapping of target -> aggregations.
745            self.related_events.entry(to).or_default().extend(aggregations);
746        }
747    }
748
749    /// Mark an aggregation event as being sent (i.e. it transitions from an
750    /// local transaction id to its remote event id counterpart), by
751    /// updating the internal mappings.
752    ///
753    /// When an aggregation has been marked as sent, it may need to be reapplied
754    /// to the corresponding [`TimelineItemContent`]; this is why we're also
755    /// passing the context to apply an aggregation here.
756    pub fn mark_aggregation_as_sent(
757        &mut self,
758        txn_id: OwnedTransactionId,
759        event_id: OwnedEventId,
760        items: &mut ObservableItemsTransaction<'_>,
761        rules: &RoomVersionRules,
762    ) -> bool {
763        let from = TimelineEventItemId::TransactionId(txn_id);
764        let to = TimelineEventItemId::EventId(event_id.clone());
765
766        let Some(target) = self.inverted_map.remove(&from) else {
767            return false;
768        };
769
770        if let Some(aggregations) = self.related_events.get_mut(&target)
771            && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
772        {
773            found.own_id = to.clone();
774
775            match &mut found.kind {
776                AggregationKind::PollResponse { .. }
777                | AggregationKind::PollEnd { .. }
778                | AggregationKind::Edit(..)
779                | AggregationKind::BeaconUpdate { .. }
780                | AggregationKind::BeaconStop { .. }
781                | AggregationKind::CallDeclined { .. } => {
782                    // Nothing particular to do.
783                }
784
785                AggregationKind::Redaction { is_local } => {
786                    // Mark the redaction as being remote and apply it (irreversibly).
787                    *is_local = false;
788
789                    let found = found.clone();
790                    find_item_and_apply_aggregation(self, items, &target, found, rules);
791                }
792
793                AggregationKind::Reaction { reaction_status, .. } => {
794                    // Mark the reaction as becoming remote, and signal that update to the
795                    // caller.
796                    *reaction_status = ReactionStatus::RemoteToRemote(event_id);
797
798                    let found = found.clone();
799                    find_item_and_apply_aggregation(self, items, &target, found, rules);
800                }
801            }
802        }
803
804        self.inverted_map.insert(to, target);
805        true
806    }
807
808    /// Returns the id of the event this aggregation relates to, if it's a known
809    /// aggregation.
810    pub fn is_aggregation_of(&self, item: &TimelineEventItemId) -> Option<&TimelineEventItemId> {
811        self.inverted_map.get(item)
812    }
813}
814
815/// Look at all the edits of a given event, and apply the most recent one, if
816/// found.
817///
818/// Returns true if an edit was found and applied, false otherwise.
819fn resolve_edits(
820    aggregations: &[Aggregation],
821    items: &ObservableItemsTransaction<'_>,
822    event: &mut Cow<'_, EventTimelineItem>,
823) -> bool {
824    // A tuple of the best edit, if we have found one and a boolean indicating if
825    // the edit is coming from a local echo. If it's from a local echo, we can't
826    // validate it as we don't have a raw JSON, but this isn't that important as
827    // we're sure we won't send ourselves invalid edits.
828    let mut best_edit: Option<(PendingEdit, bool)> = None;
829    let mut best_edit_pos = None;
830
831    for a in aggregations {
832        if let AggregationKind::Edit(pending_edit) = &a.kind {
833            match &a.own_id {
834                TimelineEventItemId::TransactionId(_) => {
835                    // A local echo is always the most recent edit: use this one.
836                    best_edit = Some((pending_edit.clone(), true));
837                    break;
838                }
839
840                TimelineEventItemId::EventId(event_id) => {
841                    if let Some(best_edit_pos) = &mut best_edit_pos {
842                        // Find the position of the timeline owning the edit: either the bundled
843                        // item owner if this was a bundled edit, or the edit event itself.
844                        let pos = items.position_by_event_id(
845                            pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
846                        );
847
848                        if let Some(pos) = pos {
849                            // If the edit is more recent (higher index) than the previous best
850                            // edit we knew about, use this one.
851                            if pos > *best_edit_pos {
852                                best_edit = Some((pending_edit.clone(), false));
853                                *best_edit_pos = pos;
854                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
855                            }
856                        } else {
857                            trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
858
859                            // The edit event isn't in the timeline, so it might be a bundled
860                            // edit. In this case, record it as the best edit if and only if
861                            // there wasn't any other.
862                            if best_edit.is_none() {
863                                best_edit = Some((pending_edit.clone(), false));
864                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
865                            }
866                        }
867                    } else {
868                        // There wasn't any best edit yet, so record this one as being it, with
869                        // its position.
870                        best_edit = Some((pending_edit.clone(), false));
871                        best_edit_pos = items.position_by_event_id(event_id);
872                        trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
873                    }
874                }
875            }
876        }
877    }
878
879    if let Some((edit, is_local_echo)) = best_edit {
880        edit_item(event, edit, is_local_echo)
881    } else {
882        false
883    }
884}
885
886/// Apply the selected edit to the given EventTimelineItem.
887///
888/// Returns true if the edit was applied, false otherwise (because the edit and
889/// original timeline item types didn't match, for instance).
890fn edit_item(
891    item: &mut Cow<'_, EventTimelineItem>,
892    edit: PendingEdit,
893    is_local_echo: bool,
894) -> bool {
895    // We can receive edits from a local echo, i.e. the edit wasn't yet received
896    // from the homeserver.
897    //
898    // Before we send an edit we check that the event is allowed to be edited and
899    // that the replacement content is allowed.
900    //
901    // We don't have yet a full JSON of the event, so we can't do the validation
902    // here.
903    if !is_local_echo {
904        let Some(original_json) = item.original_json() else {
905            error!("The original event does not have the JSON field set.");
906            return false;
907        };
908
909        let Some(edit_json) = &edit.edit_json else {
910            error!(
911                "The replacement event of a remotely received edit does not have the JSON field set."
912            );
913            return false;
914        };
915
916        match check_validity_of_replacement_events(
917            original_json,
918            item.encryption_info(),
919            edit_json,
920            edit.encryption_info.as_deref(),
921        ) {
922            Ok(content) => content,
923            Err(e) => {
924                warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
925                return false;
926            }
927        }
928    }
929
930    let TimelineItemContent::MsgLike(content) = item.content() else {
931        info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
932        return false;
933    };
934
935    let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
936
937    match (edit_kind, content) {
938        (
939            PendingEditKind::RoomMessage(replacement),
940            MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
941        ) => {
942            // First combination: it's a message edit for a message. Good.
943            let mut new_msg = msg.clone();
944            new_msg.apply_edit(replacement.new_content);
945
946            let new_item = item.with_content_and_latest_edit(
947                TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
948                edit_json,
949            );
950            *item = Cow::Owned(new_item);
951        }
952
953        (
954            PendingEditKind::Poll(replacement),
955            MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
956        ) => {
957            // Second combination: it's a poll edit for a poll. Good.
958            if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
959                let new_item = item.with_content_and_latest_edit(
960                    TimelineItemContent::MsgLike(
961                        content.with_kind(MsgLikeKind::Poll(new_poll_state)),
962                    ),
963                    edit_json,
964                );
965                *item = Cow::Owned(new_item);
966            } else {
967                // The poll has ended, so we can't edit it anymore.
968                return false;
969            }
970        }
971
972        (edit_kind, _) => {
973            // Invalid combination.
974            info!(
975                content = item.content().debug_string(),
976                edit = format!("{:?}", edit_kind),
977                "Mismatch between edit type and content type",
978            );
979            return false;
980        }
981    }
982
983    if let Some(encryption_info) = encryption_info {
984        *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
985    }
986
987    true
988}
989
990/// Find an item identified by the target identifier, and apply the aggregation
991/// onto it.
992///
993/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
994/// `None` otherwise.
995pub(crate) fn find_item_and_apply_aggregation(
996    aggregations: &Aggregations,
997    items: &mut ObservableItemsTransaction<'_>,
998    target: &TimelineEventItemId,
999    aggregation: Aggregation,
1000    rules: &RoomVersionRules,
1001) -> Option<EventTimelineItem> {
1002    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
1003        trace!("couldn't find aggregation's target {target:?}");
1004        return None;
1005    };
1006
1007    let mut cowed = Cow::Borrowed(&*event_item);
1008    match aggregation.apply(&mut cowed, rules) {
1009        ApplyAggregationResult::UpdatedItem => {
1010            trace!("applied aggregation");
1011            let new_event_item = cowed.into_owned();
1012            let new_item =
1013                TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
1014            items.replace(idx, new_item);
1015            Some(new_event_item)
1016        }
1017        ApplyAggregationResult::Edit => {
1018            if let Some(aggregations) = aggregations.related_events.get(target)
1019                && resolve_edits(aggregations, items, &mut cowed)
1020            {
1021                let new_event_item = cowed.into_owned();
1022                let new_item =
1023                    TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
1024                items.replace(idx, new_item);
1025                return Some(new_event_item);
1026            }
1027            None
1028        }
1029        ApplyAggregationResult::LeftItemIntact => {
1030            trace!("applying the aggregation had no effect");
1031            None
1032        }
1033        ApplyAggregationResult::Error(err) => {
1034            warn!("error when applying aggregation: {err}");
1035            None
1036        }
1037    }
1038}
1039
1040/// The result of applying (or unapplying) an aggregation onto a timeline item.
1041enum ApplyAggregationResult {
1042    /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
1043    UpdatedItem,
1044
1045    /// An edit must be included in the edit set and resolved later, using the
1046    /// relative position of the edits.
1047    Edit,
1048
1049    /// The item hasn't been modified after applying the aggregation, because it
1050    /// was likely already applied prior to this.
1051    LeftItemIntact,
1052
1053    /// An error happened while applying the aggregation.
1054    Error(AggregationError),
1055}
1056
1057#[derive(Debug, thiserror::Error)]
1058pub(crate) enum AggregationError {
1059    #[error("trying to end a poll twice")]
1060    PollAlreadyEnded,
1061
1062    #[error("a poll end can't be unapplied")]
1063    CantUndoPollEnd,
1064
1065    #[error("a redaction can't be unapplied")]
1066    CantUndoRedaction,
1067
1068    #[error("a beacon stop can't be unapplied")]
1069    CantUndoBeaconStop,
1070
1071    #[error("a call decline can't be unapplied")]
1072    CantUndoRtcDecline,
1073
1074    #[error(
1075        "trying to apply an aggregation of one type to an invalid target: \
1076         expected {expected}, actual {actual}"
1077    )]
1078    InvalidType { expected: String, actual: String },
1079}