matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::deserialized_responses::EncryptionInfo;
44use ruma::{
45    events::{
46        poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
47        relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
48        AnySyncTimelineEvent,
49    },
50    serde::Raw,
51    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
52};
53use tracing::{info, trace, warn};
54
55use super::{rfind_event_by_item_id, ObservableItemsTransaction};
56use crate::timeline::{
57    EventTimelineItem, MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus,
58    TimelineEventItemId, TimelineItem, TimelineItemContent,
59};
60
61#[derive(Clone)]
62pub(in crate::timeline) enum PendingEditKind {
63    RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
64    Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
65}
66
67impl std::fmt::Debug for PendingEditKind {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        match self {
70            Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
71            Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
72        }
73    }
74}
75
76#[derive(Clone, Debug)]
77pub(in crate::timeline) struct PendingEdit {
78    /// The kind of edit this is.
79    pub kind: PendingEditKind,
80
81    /// The raw JSON for the edit.
82    pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
83
84    /// The encryption info for this edit.
85    pub encryption_info: Option<Arc<EncryptionInfo>>,
86}
87
88/// Which kind of aggregation (related event) is this?
89#[derive(Clone, Debug)]
90pub(crate) enum AggregationKind {
91    /// This is a response to a poll.
92    PollResponse {
93        /// Sender of the poll's response.
94        sender: OwnedUserId,
95        /// Timestamp at which the response has beens ent.
96        timestamp: MilliSecondsSinceUnixEpoch,
97        /// All the answers to the poll sent by the sender.
98        answers: Vec<String>,
99    },
100
101    /// This is the marker of the end of a poll.
102    PollEnd {
103        /// Timestamp at which the poll ends, i.e. all the responses with a
104        /// timestamp prior to this one should be taken into account
105        /// (and all the responses with a timestamp after this one
106        /// should be dropped).
107        end_date: MilliSecondsSinceUnixEpoch,
108    },
109
110    /// This is a reaction to another event.
111    Reaction {
112        /// The reaction "key" displayed by the client, often an emoji.
113        key: String,
114        /// Sender of the reaction.
115        sender: OwnedUserId,
116        /// Timestamp at which the reaction has been sent.
117        timestamp: MilliSecondsSinceUnixEpoch,
118        /// The send status of the reaction this is, with handles to abort it if
119        /// we can, etc.
120        reaction_status: ReactionStatus,
121    },
122
123    /// An event has been redacted.
124    Redaction,
125
126    /// An event has been edited.
127    ///
128    /// Note that edits can't be applied in isolation; we need to identify what
129    /// the *latest* edit is, based on the event ordering. As such, they're
130    /// handled exceptionally in `Aggregation::apply` and
131    /// `Aggregation::unapply`, and the callers have the responsibility of
132    /// considering all the edits and applying only the right one.
133    Edit(PendingEdit),
134}
135
136/// An aggregation is an event related to another event (for instance a
137/// reaction, a poll's response, etc.).
138///
139/// It can be either a local or a remote echo.
140#[derive(Clone, Debug)]
141pub(crate) struct Aggregation {
142    /// The kind of aggregation this represents.
143    pub kind: AggregationKind,
144
145    /// The own timeline identifier for an aggregation.
146    ///
147    /// It will be a transaction id when the aggregation is still a local echo,
148    /// and it will transition into an event id when the aggregation is a
149    /// remote echo (i.e. has been received in a sync response):
150    pub own_id: TimelineEventItemId,
151}
152
153/// Get the poll state from a given [`TimelineItemContent`].
154fn poll_state_from_item<'a>(
155    event: &'a mut Cow<'_, EventTimelineItem>,
156) -> Result<&'a mut PollState, AggregationError> {
157    if event.content().is_poll() {
158        // It was a poll! Now return the state as mutable.
159        let state = as_variant!(
160            event.to_mut().content_mut(),
161            TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
162        )
163        .expect("it was a poll just above");
164        Ok(state)
165    } else {
166        Err(AggregationError::InvalidType {
167            expected: "a poll".to_owned(),
168            actual: event.content().debug_string().to_owned(),
169        })
170    }
171}
172
173impl Aggregation {
174    /// Create a new [`Aggregation`].
175    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
176        Self { kind, own_id }
177    }
178
179    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
180    ///
181    /// In case of success, returns an enum indicating whether the applied
182    /// aggregation had an effect on the content; if it updated it, then the
183    /// caller has the responsibility to reflect that change.
184    ///
185    /// In case of error, returns an error detailing why the aggregation
186    /// couldn't be applied.
187    fn apply(
188        &self,
189        event: &mut Cow<'_, EventTimelineItem>,
190        room_version: &RoomVersionId,
191    ) -> ApplyAggregationResult {
192        match &self.kind {
193            AggregationKind::PollResponse { sender, timestamp, answers } => {
194                match poll_state_from_item(event) {
195                    Ok(state) => {
196                        state.add_response(sender.clone(), *timestamp, answers.clone());
197                        ApplyAggregationResult::UpdatedItem
198                    }
199                    Err(err) => ApplyAggregationResult::Error(err),
200                }
201            }
202
203            AggregationKind::Redaction => {
204                if event.content().is_redacted() {
205                    ApplyAggregationResult::LeftItemIntact
206                } else {
207                    let new_item = event.redact(room_version);
208                    *event = Cow::Owned(new_item);
209                    ApplyAggregationResult::UpdatedItem
210                }
211            }
212
213            AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
214                Ok(state) => {
215                    if !state.end(*end_date) {
216                        return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
217                    }
218                    ApplyAggregationResult::UpdatedItem
219                }
220                Err(err) => ApplyAggregationResult::Error(err),
221            },
222
223            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
224                let Some(reactions) = event.content().reactions() else {
225                    // An item that can't hold any reactions.
226                    return ApplyAggregationResult::LeftItemIntact;
227                };
228
229                let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
230
231                // If the reaction was already added to the item, we don't need to add it back.
232                //
233                // Search for a previous reaction that would be equivalent.
234
235                let is_same = previous_reaction.is_some_and(|prev| {
236                    prev.timestamp == *timestamp
237                        && matches!(
238                            (&prev.status, reaction_status),
239                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
240                                | (
241                                    ReactionStatus::LocalToRemote(_),
242                                    ReactionStatus::LocalToRemote(_),
243                                )
244                                | (
245                                    ReactionStatus::RemoteToRemote(_),
246                                    ReactionStatus::RemoteToRemote(_),
247                                )
248                        )
249                });
250
251                if is_same {
252                    ApplyAggregationResult::LeftItemIntact
253                } else {
254                    let reactions = event
255                        .to_mut()
256                        .content_mut()
257                        .reactions_mut()
258                        .expect("reactions was Some above");
259
260                    reactions.entry(key.clone()).or_default().insert(
261                        sender.clone(),
262                        ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
263                    );
264
265                    ApplyAggregationResult::UpdatedItem
266                }
267            }
268
269            AggregationKind::Edit(_) => {
270                // Let the caller handle the edit.
271                ApplyAggregationResult::Edit
272            }
273        }
274    }
275
276    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
277    ///
278    /// In case of success, returns an enum indicating whether unapplying the
279    /// aggregation had an effect on the content; if it updated it, then the
280    /// caller has the responsibility to reflect that change.
281    ///
282    /// In case of error, returns an error detailing why the aggregation
283    /// couldn't be unapplied.
284    fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
285        match &self.kind {
286            AggregationKind::PollResponse { sender, timestamp, .. } => {
287                let state = match poll_state_from_item(event) {
288                    Ok(state) => state,
289                    Err(err) => return ApplyAggregationResult::Error(err),
290                };
291                state.remove_response(sender, *timestamp);
292                ApplyAggregationResult::UpdatedItem
293            }
294
295            AggregationKind::PollEnd { .. } => {
296                // Assume we can't undo a poll end event at the moment.
297                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
298            }
299
300            AggregationKind::Redaction => {
301                // Redactions are not reversible.
302                ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
303            }
304
305            AggregationKind::Reaction { key, sender, .. } => {
306                let Some(reactions) = event.content().reactions() else {
307                    // An item that can't hold any reactions.
308                    return ApplyAggregationResult::LeftItemIntact;
309                };
310
311                // We only need to remove the previous reaction if it was there.
312                //
313                // Search for it.
314
315                let had_entry =
316                    reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
317
318                if had_entry {
319                    let reactions = event
320                        .to_mut()
321                        .content_mut()
322                        .reactions_mut()
323                        .expect("reactions was some above");
324                    let by_user = reactions.get_mut(key);
325                    if let Some(by_user) = by_user {
326                        by_user.swap_remove(sender);
327                        // If this was the last reaction, remove the entire map for this key.
328                        if by_user.is_empty() {
329                            reactions.swap_remove(key);
330                        }
331                    }
332                    ApplyAggregationResult::UpdatedItem
333                } else {
334                    ApplyAggregationResult::LeftItemIntact
335                }
336            }
337
338            AggregationKind::Edit(_) => {
339                // Let the caller handle the edit.
340                ApplyAggregationResult::Edit
341            }
342        }
343    }
344}
345
346/// Manager for all known existing aggregations to all events in the timeline.
347#[derive(Clone, Debug, Default)]
348pub(crate) struct Aggregations {
349    /// Mapping of a target event to its list of aggregations.
350    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
351
352    /// Mapping of a related event identifier to its target.
353    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
354}
355
356impl Aggregations {
357    /// Clear all the known aggregations from all the mappings.
358    pub fn clear(&mut self) {
359        self.related_events.clear();
360        self.inverted_map.clear();
361    }
362
363    /// Add a given aggregation that relates to the [`TimelineItemContent`]
364    /// identified by the given [`TimelineEventItemId`].
365    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
366        // If the aggregation is a redaction, it invalidates all the other aggregations;
367        // remove them.
368        if matches!(aggregation.kind, AggregationKind::Redaction) {
369            for agg in self.related_events.remove(&related_to).unwrap_or_default() {
370                self.inverted_map.remove(&agg.own_id);
371            }
372        }
373
374        // If there was any redaction among the current aggregation, adding a new one
375        // should be a noop.
376        if let Some(previous_aggregations) = self.related_events.get(&related_to) {
377            if previous_aggregations
378                .iter()
379                .any(|agg| matches!(agg.kind, AggregationKind::Redaction))
380            {
381                return;
382            }
383        }
384
385        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
386        self.related_events.entry(related_to).or_default().push(aggregation);
387    }
388
389    /// Is the given id one for a known aggregation to another event?
390    ///
391    /// If so, unapplies it by replacing the corresponding related item, if
392    /// needs be.
393    ///
394    /// Returns true if an aggregation was found. This doesn't mean
395    /// the underlying item has been updated, if it was missing from the
396    /// timeline for instance.
397    ///
398    /// May return an error if it found an aggregation, but it couldn't be
399    /// properly applied.
400    pub fn try_remove_aggregation(
401        &mut self,
402        aggregation_id: &TimelineEventItemId,
403        items: &mut ObservableItemsTransaction<'_>,
404    ) -> Result<bool, AggregationError> {
405        let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
406
407        // Find and remove the aggregation in the other mapping.
408        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
409            let removed = aggregations
410                .iter()
411                .position(|agg| agg.own_id == *aggregation_id)
412                .map(|idx| aggregations.remove(idx));
413
414            // If this was the last aggregation, remove the entry in the `related_events`
415            // mapping.
416            if aggregations.is_empty() {
417                self.related_events.remove(found);
418            }
419
420            removed
421        } else {
422            None
423        };
424
425        let Some(aggregation) = aggregation else {
426            warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
427            return Ok(false);
428        };
429
430        if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
431            let mut cowed = Cow::Borrowed(&*item);
432            match aggregation.unapply(&mut cowed) {
433                ApplyAggregationResult::UpdatedItem => {
434                    trace!("removed aggregation");
435                    items.replace(
436                        item_pos,
437                        TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
438                    );
439                }
440                ApplyAggregationResult::LeftItemIntact => {}
441                ApplyAggregationResult::Error(err) => {
442                    warn!("error when unapplying aggregation: {err}");
443                }
444                ApplyAggregationResult::Edit => {
445                    // This edit has been removed; try to find another that still applies.
446                    if let Some(aggregations) = self.related_events.get(found) {
447                        if resolve_edits(aggregations, items, &mut cowed) {
448                            items.replace(
449                                item_pos,
450                                TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
451                            );
452                        } else {
453                            // No other edit was found, leave the item as is.
454                            // TODO likely need to change the item to indicate
455                            // it's been un-edited etc.
456                        }
457                    } else {
458                        // No other edits apply.
459                    }
460                }
461            }
462        } else {
463            info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
464        }
465
466        Ok(true)
467    }
468
469    /// Apply all the aggregations to a [`TimelineItemContent`].
470    ///
471    /// Will return an error at the first aggregation that couldn't be applied;
472    /// see [`Aggregation::apply`] which explains under which conditions it can
473    /// happen.
474    ///
475    /// Returns a boolean indicating whether at least one aggregation was
476    /// applied.
477    pub fn apply_all(
478        &self,
479        item_id: &TimelineEventItemId,
480        event: &mut Cow<'_, EventTimelineItem>,
481        items: &mut ObservableItemsTransaction<'_>,
482        room_version: &RoomVersionId,
483    ) -> Result<(), AggregationError> {
484        let Some(aggregations) = self.related_events.get(item_id) else {
485            return Ok(());
486        };
487
488        let mut has_edits = false;
489
490        for a in aggregations {
491            match a.apply(event, room_version) {
492                ApplyAggregationResult::Edit => {
493                    has_edits = true;
494                }
495                ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
496                ApplyAggregationResult::Error(err) => return Err(err),
497            }
498        }
499
500        if has_edits {
501            resolve_edits(aggregations, items, event);
502        }
503
504        Ok(())
505    }
506
507    /// Mark a target event as being sent (i.e. it transitions from an local
508    /// transaction id to its remote event id counterpart), by updating the
509    /// internal mappings.
510    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
511        let from = TimelineEventItemId::TransactionId(txn_id);
512        let to = TimelineEventItemId::EventId(event_id);
513
514        // Update the aggregations in the `related_events` field.
515        if let Some(aggregations) = self.related_events.remove(&from) {
516            // Update the inverted mappings (from aggregation's id, to the new target id).
517            for a in &aggregations {
518                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
519                    debug_assert_eq!(prev_target, from);
520                    self.inverted_map.insert(a.own_id.clone(), to.clone());
521                }
522            }
523            // Update the direct mapping of target -> aggregations.
524            self.related_events.entry(to).or_default().extend(aggregations);
525        }
526    }
527
528    /// Mark an aggregation event as being sent (i.e. it transitions from an
529    /// local transaction id to its remote event id counterpart), by
530    /// updating the internal mappings.
531    ///
532    /// When an aggregation has been marked as sent, it may need to be reapplied
533    /// to the corresponding [`TimelineItemContent`]; this is why we're also
534    /// passing the context to apply an aggregation here.
535    pub fn mark_aggregation_as_sent(
536        &mut self,
537        txn_id: OwnedTransactionId,
538        event_id: OwnedEventId,
539        items: &mut ObservableItemsTransaction<'_>,
540        room_version: &RoomVersionId,
541    ) -> bool {
542        let from = TimelineEventItemId::TransactionId(txn_id);
543        let to = TimelineEventItemId::EventId(event_id.clone());
544
545        let Some(target) = self.inverted_map.remove(&from) else {
546            return false;
547        };
548
549        if let Some(aggregations) = self.related_events.get_mut(&target) {
550            if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
551                found.own_id = to.clone();
552
553                match &mut found.kind {
554                    AggregationKind::PollResponse { .. }
555                    | AggregationKind::PollEnd { .. }
556                    | AggregationKind::Edit(..)
557                    | AggregationKind::Redaction => {
558                        // Nothing particular to do.
559                    }
560
561                    AggregationKind::Reaction { reaction_status, .. } => {
562                        // Mark the reaction as becoming remote, and signal that update to the
563                        // caller.
564                        *reaction_status = ReactionStatus::RemoteToRemote(event_id);
565
566                        let found = found.clone();
567                        find_item_and_apply_aggregation(self, items, &target, found, room_version);
568                    }
569                }
570            }
571        }
572
573        self.inverted_map.insert(to, target);
574        true
575    }
576}
577
578/// Look at all the edits of a given event, and apply the most recent one, if
579/// found.
580///
581/// Returns true if an edit was found and applied, false otherwise.
582fn resolve_edits(
583    aggregations: &[Aggregation],
584    items: &ObservableItemsTransaction<'_>,
585    event: &mut Cow<'_, EventTimelineItem>,
586) -> bool {
587    let mut best_edit: Option<PendingEdit> = None;
588    let mut best_edit_pos = None;
589
590    for a in aggregations {
591        if let AggregationKind::Edit(pending_edit) = &a.kind {
592            match &a.own_id {
593                TimelineEventItemId::TransactionId(_) => {
594                    // A local echo is always the most recent edit: use this one.
595                    best_edit = Some(pending_edit.clone());
596                    break;
597                }
598
599                TimelineEventItemId::EventId(event_id) => {
600                    if let Some(best_edit_pos) = &mut best_edit_pos {
601                        let pos = items.position_by_event_id(event_id);
602                        if let Some(pos) = pos {
603                            // If the edit is more recent (higher index) than the previous best
604                            // edit we knew about, use this one.
605                            if pos > *best_edit_pos {
606                                best_edit = Some(pending_edit.clone());
607                                *best_edit_pos = pos;
608                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
609                            }
610                        } else {
611                            trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
612
613                            // The edit event isn't in the timeline, so it might be a bundled
614                            // edit. In this case, record it as the best edit if and only if
615                            // there wasn't any other.
616                            if best_edit.is_none() {
617                                best_edit = Some(pending_edit.clone());
618                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
619                            }
620                        }
621                    } else {
622                        // There wasn't any best edit yet, so record this one as being it, with
623                        // its position.
624                        best_edit = Some(pending_edit.clone());
625                        best_edit_pos = items.position_by_event_id(event_id);
626                        trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
627                    }
628                }
629            }
630        }
631    }
632
633    if let Some(edit) = best_edit {
634        edit_item(event, edit)
635    } else {
636        false
637    }
638}
639
640/// Apply the selected edit to the given EventTimelineItem.
641fn edit_item(item: &mut Cow<'_, EventTimelineItem>, edit: PendingEdit) -> bool {
642    let PendingEdit { kind: edit_kind, edit_json, encryption_info } = edit;
643
644    if let Some(event_json) = &edit_json {
645        let Some(edit_sender) = event_json.get_field::<OwnedUserId>("sender").ok().flatten() else {
646            info!("edit event didn't have a sender; likely a malformed event");
647            return false;
648        };
649
650        if edit_sender != item.sender() {
651            info!(
652                original_sender = %item.sender(),
653                %edit_sender,
654                "Edit event applies to another user's timeline item, discarding"
655            );
656            return false;
657        }
658    }
659
660    let TimelineItemContent::MsgLike(content) = item.content() else {
661        info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
662        return false;
663    };
664
665    match (edit_kind, content) {
666        (
667            PendingEditKind::RoomMessage(replacement),
668            MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
669        ) => {
670            // First combination: it's a message edit for a message. Good.
671            let mut new_msg = msg.clone();
672            new_msg.apply_edit(replacement.new_content);
673
674            let new_item = item.with_content_and_latest_edit(
675                TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
676                edit_json,
677            );
678            *item = Cow::Owned(new_item);
679        }
680
681        (
682            PendingEditKind::Poll(replacement),
683            MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
684        ) => {
685            // Second combination: it's a poll edit for a poll. Good.
686            if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
687                let new_item = item.with_content_and_latest_edit(
688                    TimelineItemContent::MsgLike(
689                        content.with_kind(MsgLikeKind::Poll(new_poll_state)),
690                    ),
691                    edit_json,
692                );
693                *item = Cow::Owned(new_item);
694            } else {
695                // The poll has ended, so we can't edit it anymore.
696                return false;
697            }
698        }
699
700        (edit_kind, _) => {
701            // Invalid combination.
702            info!(
703                content = item.content().debug_string(),
704                edit = format!("{:?}", edit_kind),
705                "Mismatch between edit type and content type",
706            );
707            return false;
708        }
709    }
710
711    if let Some(encryption_info) = encryption_info {
712        *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
713    }
714
715    true
716}
717
718/// Find an item identified by the target identifier, and apply the aggregation
719/// onto it.
720///
721/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
722/// `None` otherwise.
723pub(crate) fn find_item_and_apply_aggregation(
724    aggregations: &Aggregations,
725    items: &mut ObservableItemsTransaction<'_>,
726    target: &TimelineEventItemId,
727    aggregation: Aggregation,
728    room_version: &RoomVersionId,
729) -> Option<EventTimelineItem> {
730    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
731        trace!("couldn't find aggregation's target {target:?}");
732        return None;
733    };
734
735    let mut cowed = Cow::Borrowed(&*event_item);
736    match aggregation.apply(&mut cowed, room_version) {
737        ApplyAggregationResult::UpdatedItem => {
738            trace!("applied aggregation");
739            let new_event_item = cowed.into_owned();
740            let new_item =
741                TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
742            items.replace(idx, new_item);
743            Some(new_event_item)
744        }
745        ApplyAggregationResult::Edit => {
746            if let Some(aggregations) = aggregations.related_events.get(target) {
747                if resolve_edits(aggregations, items, &mut cowed) {
748                    let new_event_item = cowed.into_owned();
749                    let new_item = TimelineItem::new(
750                        new_event_item.clone(),
751                        event_item.internal_id.to_owned(),
752                    );
753                    items.replace(idx, new_item);
754                    return Some(new_event_item);
755                }
756            }
757            None
758        }
759        ApplyAggregationResult::LeftItemIntact => {
760            trace!("applying the aggregation had no effect");
761            None
762        }
763        ApplyAggregationResult::Error(err) => {
764            warn!("error when applying aggregation: {err}");
765            None
766        }
767    }
768}
769
770/// The result of applying (or unapplying) an aggregation onto a timeline item.
771enum ApplyAggregationResult {
772    /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
773    UpdatedItem,
774
775    /// An edit must be included in the edit set and resolved later, using the
776    /// relative position of the edits.
777    Edit,
778
779    /// The item hasn't been modified after applying the aggregation, because it
780    /// was likely already applied prior to this.
781    LeftItemIntact,
782
783    /// An error happened while applying the aggregation.
784    Error(AggregationError),
785}
786
787#[derive(Debug, thiserror::Error)]
788pub(crate) enum AggregationError {
789    #[error("trying to end a poll twice")]
790    PollAlreadyEnded,
791
792    #[error("a poll end can't be unapplied")]
793    CantUndoPollEnd,
794
795    #[error("a redaction can't be unapplied")]
796    CantUndoRedaction,
797
798    #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
799    InvalidType { expected: String, actual: String },
800}