matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::deserialized_responses::EncryptionInfo;
44use ruma::{
45    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
46    events::{
47        AnySyncTimelineEvent,
48        poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
49        relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
50    },
51    room_version_rules::RoomVersionRules,
52    serde::Raw,
53};
54use tracing::{info, trace, warn};
55
56use super::{ObservableItemsTransaction, rfind_event_by_item_id};
57use crate::timeline::{
58    EventTimelineItem, MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus,
59    TimelineEventItemId, TimelineItem, TimelineItemContent,
60};
61
62#[derive(Clone)]
63pub(in crate::timeline) enum PendingEditKind {
64    RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
65    Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
66}
67
68impl std::fmt::Debug for PendingEditKind {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
72            Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
73        }
74    }
75}
76
77#[derive(Clone, Debug)]
78pub(in crate::timeline) struct PendingEdit {
79    /// The kind of edit this is.
80    pub kind: PendingEditKind,
81
82    /// The raw JSON for the edit.
83    pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
84
85    /// The encryption info for this edit.
86    pub encryption_info: Option<Arc<EncryptionInfo>>,
87
88    /// If provided, this is the identifier of a remote event item that included
89    /// this bundled edit.
90    pub bundled_item_owner: Option<OwnedEventId>,
91}
92
93/// Which kind of aggregation (related event) is this?
94#[derive(Clone, Debug)]
95pub(crate) enum AggregationKind {
96    /// This is a response to a poll.
97    PollResponse {
98        /// Sender of the poll's response.
99        sender: OwnedUserId,
100        /// Timestamp at which the response has beens ent.
101        timestamp: MilliSecondsSinceUnixEpoch,
102        /// All the answers to the poll sent by the sender.
103        answers: Vec<String>,
104    },
105
106    /// This is the marker of the end of a poll.
107    PollEnd {
108        /// Timestamp at which the poll ends, i.e. all the responses with a
109        /// timestamp prior to this one should be taken into account
110        /// (and all the responses with a timestamp after this one
111        /// should be dropped).
112        end_date: MilliSecondsSinceUnixEpoch,
113    },
114
115    /// This is a reaction to another event.
116    Reaction {
117        /// The reaction "key" displayed by the client, often an emoji.
118        key: String,
119        /// Sender of the reaction.
120        sender: OwnedUserId,
121        /// Timestamp at which the reaction has been sent.
122        timestamp: MilliSecondsSinceUnixEpoch,
123        /// The send status of the reaction this is, with handles to abort it if
124        /// we can, etc.
125        reaction_status: ReactionStatus,
126    },
127
128    /// An event has been redacted.
129    Redaction,
130
131    /// An event has been edited.
132    ///
133    /// Note that edits can't be applied in isolation; we need to identify what
134    /// the *latest* edit is, based on the event ordering. As such, they're
135    /// handled exceptionally in `Aggregation::apply` and
136    /// `Aggregation::unapply`, and the callers have the responsibility of
137    /// considering all the edits and applying only the right one.
138    Edit(PendingEdit),
139}
140
141/// An aggregation is an event related to another event (for instance a
142/// reaction, a poll's response, etc.).
143///
144/// It can be either a local or a remote echo.
145#[derive(Clone, Debug)]
146pub(crate) struct Aggregation {
147    /// The kind of aggregation this represents.
148    pub kind: AggregationKind,
149
150    /// The own timeline identifier for an aggregation.
151    ///
152    /// It will be a transaction id when the aggregation is still a local echo,
153    /// and it will transition into an event id when the aggregation is a
154    /// remote echo (i.e. has been received in a sync response):
155    pub own_id: TimelineEventItemId,
156}
157
158/// Get the poll state from a given [`TimelineItemContent`].
159fn poll_state_from_item<'a>(
160    event: &'a mut Cow<'_, EventTimelineItem>,
161) -> Result<&'a mut PollState, AggregationError> {
162    if event.content().is_poll() {
163        // It was a poll! Now return the state as mutable.
164        let state = as_variant!(
165            event.to_mut().content_mut(),
166            TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
167        )
168        .expect("it was a poll just above");
169        Ok(state)
170    } else {
171        Err(AggregationError::InvalidType {
172            expected: "a poll".to_owned(),
173            actual: event.content().debug_string().to_owned(),
174        })
175    }
176}
177
178impl Aggregation {
179    /// Create a new [`Aggregation`].
180    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
181        Self { kind, own_id }
182    }
183
184    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
185    ///
186    /// In case of success, returns an enum indicating whether the applied
187    /// aggregation had an effect on the content; if it updated it, then the
188    /// caller has the responsibility to reflect that change.
189    ///
190    /// In case of error, returns an error detailing why the aggregation
191    /// couldn't be applied.
192    fn apply(
193        &self,
194        event: &mut Cow<'_, EventTimelineItem>,
195        rules: &RoomVersionRules,
196    ) -> ApplyAggregationResult {
197        match &self.kind {
198            AggregationKind::PollResponse { sender, timestamp, answers } => {
199                match poll_state_from_item(event) {
200                    Ok(state) => {
201                        state.add_response(sender.clone(), *timestamp, answers.clone());
202                        ApplyAggregationResult::UpdatedItem
203                    }
204                    Err(err) => ApplyAggregationResult::Error(err),
205                }
206            }
207
208            AggregationKind::Redaction => {
209                if event.content().is_redacted() {
210                    ApplyAggregationResult::LeftItemIntact
211                } else {
212                    let new_item = event.redact(&rules.redaction);
213                    *event = Cow::Owned(new_item);
214                    ApplyAggregationResult::UpdatedItem
215                }
216            }
217
218            AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
219                Ok(state) => {
220                    if !state.end(*end_date) {
221                        return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
222                    }
223                    ApplyAggregationResult::UpdatedItem
224                }
225                Err(err) => ApplyAggregationResult::Error(err),
226            },
227
228            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
229                let Some(reactions) = event.content().reactions() else {
230                    // An item that can't hold any reactions.
231                    return ApplyAggregationResult::LeftItemIntact;
232                };
233
234                let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
235
236                // If the reaction was already added to the item, we don't need to add it back.
237                //
238                // Search for a previous reaction that would be equivalent.
239
240                let is_same = previous_reaction.is_some_and(|prev| {
241                    prev.timestamp == *timestamp
242                        && matches!(
243                            (&prev.status, reaction_status),
244                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
245                                | (
246                                    ReactionStatus::LocalToRemote(_),
247                                    ReactionStatus::LocalToRemote(_),
248                                )
249                                | (
250                                    ReactionStatus::RemoteToRemote(_),
251                                    ReactionStatus::RemoteToRemote(_),
252                                )
253                        )
254                });
255
256                if is_same {
257                    ApplyAggregationResult::LeftItemIntact
258                } else {
259                    let reactions = event
260                        .to_mut()
261                        .content_mut()
262                        .reactions_mut()
263                        .expect("reactions was Some above");
264
265                    reactions.entry(key.clone()).or_default().insert(
266                        sender.clone(),
267                        ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
268                    );
269
270                    ApplyAggregationResult::UpdatedItem
271                }
272            }
273
274            AggregationKind::Edit(_) => {
275                // Let the caller handle the edit.
276                ApplyAggregationResult::Edit
277            }
278        }
279    }
280
281    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
282    ///
283    /// In case of success, returns an enum indicating whether unapplying the
284    /// aggregation had an effect on the content; if it updated it, then the
285    /// caller has the responsibility to reflect that change.
286    ///
287    /// In case of error, returns an error detailing why the aggregation
288    /// couldn't be unapplied.
289    fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
290        match &self.kind {
291            AggregationKind::PollResponse { sender, timestamp, .. } => {
292                let state = match poll_state_from_item(event) {
293                    Ok(state) => state,
294                    Err(err) => return ApplyAggregationResult::Error(err),
295                };
296                state.remove_response(sender, *timestamp);
297                ApplyAggregationResult::UpdatedItem
298            }
299
300            AggregationKind::PollEnd { .. } => {
301                // Assume we can't undo a poll end event at the moment.
302                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
303            }
304
305            AggregationKind::Redaction => {
306                // Redactions are not reversible.
307                ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
308            }
309
310            AggregationKind::Reaction { key, sender, .. } => {
311                let Some(reactions) = event.content().reactions() else {
312                    // An item that can't hold any reactions.
313                    return ApplyAggregationResult::LeftItemIntact;
314                };
315
316                // We only need to remove the previous reaction if it was there.
317                //
318                // Search for it.
319
320                let had_entry =
321                    reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
322
323                if had_entry {
324                    let reactions = event
325                        .to_mut()
326                        .content_mut()
327                        .reactions_mut()
328                        .expect("reactions was some above");
329                    let by_user = reactions.get_mut(key);
330                    if let Some(by_user) = by_user {
331                        by_user.swap_remove(sender);
332                        // If this was the last reaction, remove the entire map for this key.
333                        if by_user.is_empty() {
334                            reactions.swap_remove(key);
335                        }
336                    }
337                    ApplyAggregationResult::UpdatedItem
338                } else {
339                    ApplyAggregationResult::LeftItemIntact
340                }
341            }
342
343            AggregationKind::Edit(_) => {
344                // Let the caller handle the edit.
345                ApplyAggregationResult::Edit
346            }
347        }
348    }
349}
350
351/// Manager for all known existing aggregations to all events in the timeline.
352#[derive(Clone, Debug, Default)]
353pub(crate) struct Aggregations {
354    /// Mapping of a target event to its list of aggregations.
355    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
356
357    /// Mapping of a related event identifier to its target.
358    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
359}
360
361impl Aggregations {
362    /// Clear all the known aggregations from all the mappings.
363    pub fn clear(&mut self) {
364        self.related_events.clear();
365        self.inverted_map.clear();
366    }
367
368    /// Add a given aggregation that relates to the [`TimelineItemContent`]
369    /// identified by the given [`TimelineEventItemId`].
370    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
371        // If the aggregation is a redaction, it invalidates all the other aggregations;
372        // remove them.
373        if matches!(aggregation.kind, AggregationKind::Redaction) {
374            for agg in self.related_events.remove(&related_to).unwrap_or_default() {
375                self.inverted_map.remove(&agg.own_id);
376            }
377        }
378
379        // If there was any redaction among the current aggregation, adding a new one
380        // should be a noop.
381        if let Some(previous_aggregations) = self.related_events.get(&related_to)
382            && previous_aggregations
383                .iter()
384                .any(|agg| matches!(agg.kind, AggregationKind::Redaction))
385        {
386            return;
387        }
388
389        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
390        self.related_events.entry(related_to).or_default().push(aggregation);
391    }
392
393    /// Is the given id one for a known aggregation to another event?
394    ///
395    /// If so, unapplies it by replacing the corresponding related item, if
396    /// needs be.
397    ///
398    /// Returns true if an aggregation was found. This doesn't mean
399    /// the underlying item has been updated, if it was missing from the
400    /// timeline for instance.
401    ///
402    /// May return an error if it found an aggregation, but it couldn't be
403    /// properly applied.
404    pub fn try_remove_aggregation(
405        &mut self,
406        aggregation_id: &TimelineEventItemId,
407        items: &mut ObservableItemsTransaction<'_>,
408    ) -> Result<bool, AggregationError> {
409        let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
410
411        // Find and remove the aggregation in the other mapping.
412        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
413            let removed = aggregations
414                .iter()
415                .position(|agg| agg.own_id == *aggregation_id)
416                .map(|idx| aggregations.remove(idx));
417
418            // If this was the last aggregation, remove the entry in the `related_events`
419            // mapping.
420            if aggregations.is_empty() {
421                self.related_events.remove(found);
422            }
423
424            removed
425        } else {
426            None
427        };
428
429        let Some(aggregation) = aggregation else {
430            warn!(
431                "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
432                 not in related-to map."
433            );
434            return Ok(false);
435        };
436
437        if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
438            let mut cowed = Cow::Borrowed(&*item);
439            match aggregation.unapply(&mut cowed) {
440                ApplyAggregationResult::UpdatedItem => {
441                    trace!("removed aggregation");
442                    items.replace(
443                        item_pos,
444                        TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
445                    );
446                }
447                ApplyAggregationResult::LeftItemIntact => {}
448                ApplyAggregationResult::Error(err) => {
449                    warn!("error when unapplying aggregation: {err}");
450                }
451                ApplyAggregationResult::Edit => {
452                    // This edit has been removed; try to find another that still applies.
453                    if let Some(aggregations) = self.related_events.get(found) {
454                        if resolve_edits(aggregations, items, &mut cowed) {
455                            items.replace(
456                                item_pos,
457                                TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
458                            );
459                        } else {
460                            // No other edit was found, leave the item as is.
461                            // TODO likely need to change the item to indicate
462                            // it's been un-edited etc.
463                        }
464                    } else {
465                        // No other edits apply.
466                    }
467                }
468            }
469        } else {
470            info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
471        }
472
473        Ok(true)
474    }
475
476    /// Apply all the aggregations to a [`TimelineItemContent`].
477    ///
478    /// Will return an error at the first aggregation that couldn't be applied;
479    /// see [`Aggregation::apply`] which explains under which conditions it can
480    /// happen.
481    ///
482    /// Returns a boolean indicating whether at least one aggregation was
483    /// applied.
484    pub fn apply_all(
485        &self,
486        item_id: &TimelineEventItemId,
487        event: &mut Cow<'_, EventTimelineItem>,
488        items: &mut ObservableItemsTransaction<'_>,
489        rules: &RoomVersionRules,
490    ) -> Result<(), AggregationError> {
491        let Some(aggregations) = self.related_events.get(item_id) else {
492            return Ok(());
493        };
494
495        let mut has_edits = false;
496
497        for a in aggregations {
498            match a.apply(event, rules) {
499                ApplyAggregationResult::Edit => {
500                    has_edits = true;
501                }
502                ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
503                ApplyAggregationResult::Error(err) => return Err(err),
504            }
505        }
506
507        if has_edits {
508            resolve_edits(aggregations, items, event);
509        }
510
511        Ok(())
512    }
513
514    /// Mark a target event as being sent (i.e. it transitions from an local
515    /// transaction id to its remote event id counterpart), by updating the
516    /// internal mappings.
517    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
518        let from = TimelineEventItemId::TransactionId(txn_id);
519        let to = TimelineEventItemId::EventId(event_id);
520
521        // Update the aggregations in the `related_events` field.
522        if let Some(aggregations) = self.related_events.remove(&from) {
523            // Update the inverted mappings (from aggregation's id, to the new target id).
524            for a in &aggregations {
525                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
526                    debug_assert_eq!(prev_target, from);
527                    self.inverted_map.insert(a.own_id.clone(), to.clone());
528                }
529            }
530            // Update the direct mapping of target -> aggregations.
531            self.related_events.entry(to).or_default().extend(aggregations);
532        }
533    }
534
535    /// Mark an aggregation event as being sent (i.e. it transitions from an
536    /// local transaction id to its remote event id counterpart), by
537    /// updating the internal mappings.
538    ///
539    /// When an aggregation has been marked as sent, it may need to be reapplied
540    /// to the corresponding [`TimelineItemContent`]; this is why we're also
541    /// passing the context to apply an aggregation here.
542    pub fn mark_aggregation_as_sent(
543        &mut self,
544        txn_id: OwnedTransactionId,
545        event_id: OwnedEventId,
546        items: &mut ObservableItemsTransaction<'_>,
547        rules: &RoomVersionRules,
548    ) -> bool {
549        let from = TimelineEventItemId::TransactionId(txn_id);
550        let to = TimelineEventItemId::EventId(event_id.clone());
551
552        let Some(target) = self.inverted_map.remove(&from) else {
553            return false;
554        };
555
556        if let Some(aggregations) = self.related_events.get_mut(&target)
557            && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
558        {
559            found.own_id = to.clone();
560
561            match &mut found.kind {
562                AggregationKind::PollResponse { .. }
563                | AggregationKind::PollEnd { .. }
564                | AggregationKind::Edit(..)
565                | AggregationKind::Redaction => {
566                    // Nothing particular to do.
567                }
568
569                AggregationKind::Reaction { reaction_status, .. } => {
570                    // Mark the reaction as becoming remote, and signal that update to the
571                    // caller.
572                    *reaction_status = ReactionStatus::RemoteToRemote(event_id);
573
574                    let found = found.clone();
575                    find_item_and_apply_aggregation(self, items, &target, found, rules);
576                }
577            }
578        }
579
580        self.inverted_map.insert(to, target);
581        true
582    }
583}
584
585/// Look at all the edits of a given event, and apply the most recent one, if
586/// found.
587///
588/// Returns true if an edit was found and applied, false otherwise.
589fn resolve_edits(
590    aggregations: &[Aggregation],
591    items: &ObservableItemsTransaction<'_>,
592    event: &mut Cow<'_, EventTimelineItem>,
593) -> bool {
594    let mut best_edit: Option<PendingEdit> = None;
595    let mut best_edit_pos = None;
596
597    for a in aggregations {
598        if let AggregationKind::Edit(pending_edit) = &a.kind {
599            match &a.own_id {
600                TimelineEventItemId::TransactionId(_) => {
601                    // A local echo is always the most recent edit: use this one.
602                    best_edit = Some(pending_edit.clone());
603                    break;
604                }
605
606                TimelineEventItemId::EventId(event_id) => {
607                    if let Some(best_edit_pos) = &mut best_edit_pos {
608                        // Find the position of the timeline owning the edit: either the bundled
609                        // item owner if this was a bundled edit, or the edit event itself.
610                        let pos = items.position_by_event_id(
611                            pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
612                        );
613
614                        if let Some(pos) = pos {
615                            // If the edit is more recent (higher index) than the previous best
616                            // edit we knew about, use this one.
617                            if pos > *best_edit_pos {
618                                best_edit = Some(pending_edit.clone());
619                                *best_edit_pos = pos;
620                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
621                            }
622                        } else {
623                            trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
624
625                            // The edit event isn't in the timeline, so it might be a bundled
626                            // edit. In this case, record it as the best edit if and only if
627                            // there wasn't any other.
628                            if best_edit.is_none() {
629                                best_edit = Some(pending_edit.clone());
630                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
631                            }
632                        }
633                    } else {
634                        // There wasn't any best edit yet, so record this one as being it, with
635                        // its position.
636                        best_edit = Some(pending_edit.clone());
637                        best_edit_pos = items.position_by_event_id(event_id);
638                        trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
639                    }
640                }
641            }
642        }
643    }
644
645    if let Some(edit) = best_edit { edit_item(event, edit) } else { false }
646}
647
648/// Apply the selected edit to the given EventTimelineItem.
649///
650/// Returns true if the edit was applied, false otherwise (because the edit and
651/// original timeline item types didn't match, for instance).
652fn edit_item(item: &mut Cow<'_, EventTimelineItem>, edit: PendingEdit) -> bool {
653    let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
654
655    if let Some(event_json) = &edit_json {
656        let Some(edit_sender) = event_json.get_field::<OwnedUserId>("sender").ok().flatten() else {
657            info!("edit event didn't have a sender; likely a malformed event");
658            return false;
659        };
660
661        if edit_sender != item.sender() {
662            info!(
663                original_sender = %item.sender(),
664                %edit_sender,
665                "Edit event applies to another user's timeline item, discarding"
666            );
667            return false;
668        }
669    }
670
671    let TimelineItemContent::MsgLike(content) = item.content() else {
672        info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
673        return false;
674    };
675
676    match (edit_kind, content) {
677        (
678            PendingEditKind::RoomMessage(replacement),
679            MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
680        ) => {
681            // First combination: it's a message edit for a message. Good.
682            let mut new_msg = msg.clone();
683            new_msg.apply_edit(replacement.new_content);
684
685            let new_item = item.with_content_and_latest_edit(
686                TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
687                edit_json,
688            );
689            *item = Cow::Owned(new_item);
690        }
691
692        (
693            PendingEditKind::Poll(replacement),
694            MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
695        ) => {
696            // Second combination: it's a poll edit for a poll. Good.
697            if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
698                let new_item = item.with_content_and_latest_edit(
699                    TimelineItemContent::MsgLike(
700                        content.with_kind(MsgLikeKind::Poll(new_poll_state)),
701                    ),
702                    edit_json,
703                );
704                *item = Cow::Owned(new_item);
705            } else {
706                // The poll has ended, so we can't edit it anymore.
707                return false;
708            }
709        }
710
711        (edit_kind, _) => {
712            // Invalid combination.
713            info!(
714                content = item.content().debug_string(),
715                edit = format!("{:?}", edit_kind),
716                "Mismatch between edit type and content type",
717            );
718            return false;
719        }
720    }
721
722    if let Some(encryption_info) = encryption_info {
723        *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
724    }
725
726    true
727}
728
729/// Find an item identified by the target identifier, and apply the aggregation
730/// onto it.
731///
732/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
733/// `None` otherwise.
734pub(crate) fn find_item_and_apply_aggregation(
735    aggregations: &Aggregations,
736    items: &mut ObservableItemsTransaction<'_>,
737    target: &TimelineEventItemId,
738    aggregation: Aggregation,
739    rules: &RoomVersionRules,
740) -> Option<EventTimelineItem> {
741    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
742        trace!("couldn't find aggregation's target {target:?}");
743        return None;
744    };
745
746    let mut cowed = Cow::Borrowed(&*event_item);
747    match aggregation.apply(&mut cowed, rules) {
748        ApplyAggregationResult::UpdatedItem => {
749            trace!("applied aggregation");
750            let new_event_item = cowed.into_owned();
751            let new_item =
752                TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
753            items.replace(idx, new_item);
754            Some(new_event_item)
755        }
756        ApplyAggregationResult::Edit => {
757            if let Some(aggregations) = aggregations.related_events.get(target)
758                && resolve_edits(aggregations, items, &mut cowed)
759            {
760                let new_event_item = cowed.into_owned();
761                let new_item =
762                    TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
763                items.replace(idx, new_item);
764                return Some(new_event_item);
765            }
766            None
767        }
768        ApplyAggregationResult::LeftItemIntact => {
769            trace!("applying the aggregation had no effect");
770            None
771        }
772        ApplyAggregationResult::Error(err) => {
773            warn!("error when applying aggregation: {err}");
774            None
775        }
776    }
777}
778
779/// The result of applying (or unapplying) an aggregation onto a timeline item.
780enum ApplyAggregationResult {
781    /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
782    UpdatedItem,
783
784    /// An edit must be included in the edit set and resolved later, using the
785    /// relative position of the edits.
786    Edit,
787
788    /// The item hasn't been modified after applying the aggregation, because it
789    /// was likely already applied prior to this.
790    LeftItemIntact,
791
792    /// An error happened while applying the aggregation.
793    Error(AggregationError),
794}
795
796#[derive(Debug, thiserror::Error)]
797pub(crate) enum AggregationError {
798    #[error("trying to end a poll twice")]
799    PollAlreadyEnded,
800
801    #[error("a poll end can't be unapplied")]
802    CantUndoPollEnd,
803
804    #[error("a redaction can't be unapplied")]
805    CantUndoRedaction,
806
807    #[error(
808        "trying to apply an aggregation of one type to an invalid target: \
809         expected {expected}, actual {actual}"
810    )]
811    InvalidType { expected: String, actual: String },
812}