matrix_sdk_ui/timeline/controller/aggregations.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
43use ruma::{
44 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
45 events::{
46 AnySyncTimelineEvent, beacon_info::BeaconInfoEventContent,
47 poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
48 relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
49 },
50 room_version_rules::RoomVersionRules,
51 serde::Raw,
52};
53use tracing::{error, info, trace, warn};
54
55use super::{ObservableItemsTransaction, rfind_event_by_item_id};
56use crate::timeline::{
57 BeaconInfo, EventTimelineItem, LiveLocationState, MsgLikeContent, MsgLikeKind, PollState,
58 ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
59 event_item::beacon_info_matches,
60};
61
62#[derive(Clone)]
63pub(in crate::timeline) enum PendingEditKind {
64 RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
65 Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
66}
67
68impl std::fmt::Debug for PendingEditKind {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
72 Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
73 }
74 }
75}
76
77#[derive(Clone, Debug)]
78pub(in crate::timeline) struct PendingEdit {
79 /// The kind of edit this is.
80 pub kind: PendingEditKind,
81
82 /// The raw JSON for the edit.
83 pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
84
85 /// The encryption info for this edit.
86 pub encryption_info: Option<Arc<EncryptionInfo>>,
87
88 /// If provided, this is the identifier of a remote event item that included
89 /// this bundled edit.
90 pub bundled_item_owner: Option<OwnedEventId>,
91}
92
93/// Which kind of aggregation (related event) is this?
94#[derive(Clone, Debug)]
95pub(crate) enum AggregationKind {
96 /// This is a response to a poll.
97 PollResponse {
98 /// Sender of the poll's response.
99 sender: OwnedUserId,
100 /// Timestamp at which the response has beens ent.
101 timestamp: MilliSecondsSinceUnixEpoch,
102 /// All the answers to the poll sent by the sender.
103 answers: Vec<String>,
104 },
105
106 /// This is the marker of the end of a poll.
107 PollEnd {
108 /// Timestamp at which the poll ends, i.e. all the responses with a
109 /// timestamp prior to this one should be taken into account
110 /// (and all the responses with a timestamp after this one
111 /// should be dropped).
112 end_date: MilliSecondsSinceUnixEpoch,
113 },
114
115 /// This is a reaction to another event.
116 Reaction {
117 /// The reaction "key" displayed by the client, often an emoji.
118 key: String,
119 /// Sender of the reaction.
120 sender: OwnedUserId,
121 /// Timestamp at which the reaction has been sent.
122 timestamp: MilliSecondsSinceUnixEpoch,
123 /// The send status of the reaction this is, with handles to abort it if
124 /// we can, etc.
125 reaction_status: ReactionStatus,
126 },
127
128 /// An event has been redacted.
129 Redaction {
130 /// Whether this aggregation results from the local echo of a redaction.
131 /// Local echoes of redactions are applied reversibly whereas remote
132 /// echoes of redactions are applied irreversibly.
133 is_local: bool,
134 },
135
136 /// An event has been edited.
137 ///
138 /// Note that edits can't be applied in isolation; we need to identify what
139 /// the *latest* edit is, based on the event ordering. As such, they're
140 /// handled exceptionally in `Aggregation::apply` and
141 /// `Aggregation::unapply`, and the callers have the responsibility of
142 /// considering all the edits and applying only the right one.
143 Edit(PendingEdit),
144
145 /// A location update for a live location sharing session (MSC3489).
146 BeaconUpdate { location: BeaconInfo },
147
148 /// A stop event for a live location sharing session (MSC3489).
149 ///
150 /// Carries the new (non-live) [`BeaconInfoEventContent`] that should
151 /// replace the stored content on the target item, flipping
152 /// [`LiveLocationState::is_live`] to `false`.
153 ///
154 /// Unlike [`BeaconUpdate`], a beacon stop is not reversible.
155 BeaconStop { content: BeaconInfoEventContent },
156
157 /// An m.rtc.decline event for an m.rtc.notification event
158 CallDeclined {
159 /// Sender of the decline.
160 sender: OwnedUserId,
161 },
162}
163
164/// An aggregation is an event related to another event (for instance a
165/// reaction, a poll's response, etc.).
166///
167/// It can be either a local or a remote echo.
168#[derive(Clone, Debug)]
169pub(crate) struct Aggregation {
170 /// The kind of aggregation this represents.
171 pub kind: AggregationKind,
172
173 /// The own timeline identifier for an aggregation.
174 ///
175 /// It will be a transaction id when the aggregation is still a local echo,
176 /// and it will transition into an event id when the aggregation is a
177 /// remote echo (i.e. has been received in a sync response):
178 pub own_id: TimelineEventItemId,
179}
180
181/// Get the poll state from a given [`TimelineItemContent`].
182fn poll_state_from_item<'a>(
183 event: &'a mut Cow<'_, EventTimelineItem>,
184) -> Result<&'a mut PollState, AggregationError> {
185 let content = event.to_mut().content_mut();
186
187 if let TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(state), .. }) =
188 content
189 {
190 Ok(state)
191 } else {
192 Err(AggregationError::InvalidType {
193 expected: "a poll".to_owned(),
194 actual: content.debug_string().to_owned(),
195 })
196 }
197}
198
199/// Get the [`LiveLocationState`] from a given [`TimelineItemContent`], mutably.
200fn live_location_state_from_item<'a>(
201 event: &'a mut Cow<'_, EventTimelineItem>,
202) -> Result<&'a mut LiveLocationState, AggregationError> {
203 let content = event.to_mut().content_mut();
204
205 if let TimelineItemContent::MsgLike(MsgLikeContent {
206 kind: MsgLikeKind::LiveLocation(state),
207 ..
208 }) = content
209 {
210 Ok(state)
211 } else {
212 Err(AggregationError::InvalidType {
213 expected: "a live location".to_owned(),
214 actual: content.debug_string().to_owned(),
215 })
216 }
217}
218
219/// Gets the mutable list of users that did decline this notification event.
220fn rtc_notification_declinations_from_item<'a>(
221 event: &'a mut Cow<'_, EventTimelineItem>,
222) -> Result<&'a mut Vec<OwnedUserId>, AggregationError> {
223 let content = event.to_mut().content_mut();
224
225 if let TimelineItemContent::RtcNotification { declined_by, .. } = content {
226 Ok(declined_by)
227 } else {
228 Err(AggregationError::InvalidType {
229 expected: "an rtc notification".to_owned(),
230 actual: content.debug_string().to_owned(),
231 })
232 }
233}
234
235impl Aggregation {
236 /// Create a new [`Aggregation`].
237 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
238 Self { kind, own_id }
239 }
240
241 /// Apply an aggregation in-place to a given [`TimelineItemContent`].
242 ///
243 /// In case of success, returns an enum indicating whether the applied
244 /// aggregation had an effect on the content; if it updated it, then the
245 /// caller has the responsibility to reflect that change.
246 ///
247 /// In case of error, returns an error detailing why the aggregation
248 /// couldn't be applied.
249 fn apply(
250 &self,
251 event: &mut Cow<'_, EventTimelineItem>,
252 rules: &RoomVersionRules,
253 ) -> ApplyAggregationResult {
254 match &self.kind {
255 AggregationKind::PollResponse { sender, timestamp, answers } => {
256 match poll_state_from_item(event) {
257 Ok(state) => {
258 state.add_response(sender.clone(), *timestamp, answers.clone());
259 ApplyAggregationResult::UpdatedItem
260 }
261 Err(err) => ApplyAggregationResult::Error(err),
262 }
263 }
264
265 AggregationKind::Redaction { is_local } => {
266 let is_local_redacted =
267 event.content().is_redacted() && event.unredacted_item.is_some();
268 let is_remote_redacted =
269 event.content().is_redacted() && event.unredacted_item.is_none();
270 if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
271 ApplyAggregationResult::LeftItemIntact
272 } else {
273 let new_item = event.redact(&rules.redaction, *is_local);
274 *event = Cow::Owned(new_item);
275 ApplyAggregationResult::UpdatedItem
276 }
277 }
278
279 AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
280 Ok(state) => {
281 if !state.end(*end_date) {
282 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
283 }
284 ApplyAggregationResult::UpdatedItem
285 }
286 Err(err) => ApplyAggregationResult::Error(err),
287 },
288
289 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
290 let Some(reactions) = event.content().reactions() else {
291 // An item that can't hold any reactions.
292 return ApplyAggregationResult::LeftItemIntact;
293 };
294
295 let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
296
297 // If the reaction was already added to the item, we don't need to add it back.
298 //
299 // Search for a previous reaction that would be equivalent.
300
301 let is_same = previous_reaction.is_some_and(|prev| {
302 prev.timestamp == *timestamp
303 && matches!(
304 (&prev.status, reaction_status),
305 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
306 | (
307 ReactionStatus::LocalToRemote(_),
308 ReactionStatus::LocalToRemote(_),
309 )
310 | (
311 ReactionStatus::RemoteToRemote(_),
312 ReactionStatus::RemoteToRemote(_),
313 )
314 )
315 });
316
317 if is_same {
318 ApplyAggregationResult::LeftItemIntact
319 } else {
320 let reactions = event
321 .to_mut()
322 .content_mut()
323 .reactions_mut()
324 .expect("reactions was Some above");
325
326 reactions.entry(key.clone()).or_default().insert(
327 sender.clone(),
328 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
329 );
330
331 ApplyAggregationResult::UpdatedItem
332 }
333 }
334
335 AggregationKind::Edit(_) => {
336 // Let the caller handle the edit.
337 ApplyAggregationResult::Edit
338 }
339
340 AggregationKind::BeaconUpdate { location } => {
341 match live_location_state_from_item(event) {
342 Ok(state) => {
343 state.add_location(location.clone());
344 ApplyAggregationResult::UpdatedItem
345 }
346 Err(err) => ApplyAggregationResult::Error(err),
347 }
348 }
349
350 AggregationKind::BeaconStop { content } => match live_location_state_from_item(event) {
351 Ok(state) => {
352 state.stop(content.clone());
353 ApplyAggregationResult::UpdatedItem
354 }
355 Err(err) => ApplyAggregationResult::Error(err),
356 },
357
358 AggregationKind::CallDeclined { sender } => {
359 match rtc_notification_declinations_from_item(event) {
360 Ok(declinations) => {
361 if declinations.contains(sender) {
362 ApplyAggregationResult::LeftItemIntact
363 } else {
364 declinations.push(sender.clone());
365 ApplyAggregationResult::UpdatedItem
366 }
367 }
368 Err(err) => ApplyAggregationResult::Error(err),
369 }
370 }
371 }
372 }
373
374 /// Undo an aggregation in-place to a given [`TimelineItemContent`].
375 ///
376 /// In case of success, returns an enum indicating whether unapplying the
377 /// aggregation had an effect on the content; if it updated it, then the
378 /// caller has the responsibility to reflect that change.
379 ///
380 /// In case of error, returns an error detailing why the aggregation
381 /// couldn't be unapplied.
382 fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
383 match &self.kind {
384 AggregationKind::PollResponse { sender, timestamp, .. } => {
385 let state = match poll_state_from_item(event) {
386 Ok(state) => state,
387 Err(err) => return ApplyAggregationResult::Error(err),
388 };
389 state.remove_response(sender, *timestamp);
390 ApplyAggregationResult::UpdatedItem
391 }
392
393 AggregationKind::PollEnd { .. } => {
394 // Assume we can't undo a poll end event at the moment.
395 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
396 }
397
398 AggregationKind::Redaction { is_local } => {
399 if *is_local {
400 if event.unredacted_item.is_some() {
401 // Unapply local redaction.
402 *event = Cow::Owned(event.unredact());
403 ApplyAggregationResult::UpdatedItem
404 } else {
405 // Event isn't locally redacted. Nothing to do.
406 ApplyAggregationResult::LeftItemIntact
407 }
408 } else {
409 // Remote redactions are not reversible.
410 ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
411 }
412 }
413
414 AggregationKind::Reaction { key, sender, .. } => {
415 let Some(reactions) = event.content().reactions() else {
416 // An item that can't hold any reactions.
417 return ApplyAggregationResult::LeftItemIntact;
418 };
419
420 // We only need to remove the previous reaction if it was there.
421 //
422 // Search for it.
423
424 let had_entry =
425 reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
426
427 if had_entry {
428 let reactions = event
429 .to_mut()
430 .content_mut()
431 .reactions_mut()
432 .expect("reactions was some above");
433 let by_user = reactions.get_mut(key);
434 if let Some(by_user) = by_user {
435 by_user.swap_remove(sender);
436 // If this was the last reaction, remove the entire map for this key.
437 if by_user.is_empty() {
438 reactions.swap_remove(key);
439 }
440 }
441 ApplyAggregationResult::UpdatedItem
442 } else {
443 ApplyAggregationResult::LeftItemIntact
444 }
445 }
446
447 AggregationKind::Edit(_) => {
448 // Let the caller handle the edit.
449 ApplyAggregationResult::Edit
450 }
451
452 AggregationKind::BeaconUpdate { location } => {
453 match live_location_state_from_item(event) {
454 Ok(state) => {
455 state.remove_location(location.ts);
456 ApplyAggregationResult::UpdatedItem
457 }
458 Err(err) => ApplyAggregationResult::Error(err),
459 }
460 }
461
462 AggregationKind::BeaconStop { .. } => {
463 // Stopping a live location share is not reversible.
464 ApplyAggregationResult::Error(AggregationError::CantUndoBeaconStop)
465 }
466
467 AggregationKind::CallDeclined { .. } => {
468 // One cannot un-decline a call
469 ApplyAggregationResult::Error(AggregationError::CantUndoRtcDecline)
470 }
471 }
472 }
473}
474
475/// Manager for all known existing aggregations to all events in the timeline.
476#[derive(Clone, Debug, Default)]
477pub(crate) struct Aggregations {
478 /// Mapping of a target event to its list of aggregations.
479 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
480
481 /// Mapping of a related event identifier to its target.
482 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
483
484 /// A pending beacon-stop aggregation received before the corresponding live
485 /// `beacon_info` start item has arrived.
486 ///
487 /// Keyed by the sender's user ID. When a live start item is eventually
488 /// inserted via `add_item`, we check if the pending stop matches and
489 /// promote it into [`Self::related_events`] so that [`Self::apply_all`]
490 /// can apply it immediately.
491 pending_beacon_stops: HashMap<OwnedUserId, Aggregation>,
492}
493
494impl Aggregations {
495 /// Clear all the known aggregations from all the mappings.
496 pub fn clear(&mut self) {
497 self.related_events.clear();
498 self.inverted_map.clear();
499 self.pending_beacon_stops.clear();
500 }
501
502 /// Stash a [`AggregationKind::BeaconStop`] that arrived before its target
503 /// live `beacon_info` item. It will be promoted into
504 /// [`Self::related_events`] (and thus picked up by [`Self::apply_all`])
505 /// when the live item is inserted via
506 /// [`Self::promote_pending_beacon_stop`].
507 pub fn add_pending_beacon_stop(&mut self, sender: OwnedUserId, aggregation: Aggregation) {
508 self.pending_beacon_stops.insert(sender, aggregation);
509 }
510
511 /// Promote a matching stashed beacon-stop aggregation for `sender` into the
512 /// regular aggregation map, now that the live start item's
513 /// `target_event_id` is known.
514 ///
515 /// The pending stop's content must match the start event's content (except
516 /// for the `live` field) for promotion to occur. If they don't match, the
517 /// pending stop is discarded because it belongs to a different session.
518 ///
519 /// Should be called from `add_item` just before `apply_all`, when inserting
520 /// a live `beacon_info` item.
521 fn promote_pending_beacon_stop(
522 &mut self,
523 sender: &OwnedUserId,
524 target_event_id: OwnedEventId,
525 start_content: &BeaconInfoEventContent,
526 ) {
527 if !start_content.live {
528 return;
529 }
530
531 let Some(stop) = self.pending_beacon_stops.remove(sender) else { return };
532
533 let AggregationKind::BeaconStop { content: stop_content } = &stop.kind else {
534 warn!("pending beacon stop has unexpected aggregation kind");
535 return;
536 };
537
538 if !beacon_info_matches(start_content, stop_content) {
539 trace!("discarding stale pending beacon stop (content mismatch)");
540 return;
541 }
542
543 let target = TimelineEventItemId::EventId(target_event_id);
544 self.add(target, stop);
545 }
546
547 /// Add a given aggregation that relates to the [`TimelineItemContent`]
548 /// identified by the given [`TimelineEventItemId`].
549 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
550 // If the aggregation is a redaction, it invalidates all the other aggregations;
551 // remove them.
552 if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
553 for agg in self.related_events.remove(&related_to).unwrap_or_default() {
554 self.inverted_map.remove(&agg.own_id);
555 }
556 }
557
558 // If there was any redaction among the current aggregation, adding a new one
559 // should be a noop.
560 if let Some(previous_aggregations) = self.related_events.get(&related_to)
561 && previous_aggregations
562 .iter()
563 .any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
564 {
565 return;
566 }
567
568 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
569
570 // We can have 3 different states for the same aggregation in related_events, in
571 // chronological order:
572 //
573 // 1. The local echo with a transaction ID.
574 // 2. The local echo with the event ID returned by the server after sending the
575 // event.
576 // 3. The remote echo received via sync.
577 //
578 // The transition from states 1 to 2 is handled in `mark_aggregation_as_sent()`.
579 // So here we need to handle the transition from states 2 to 3. We need to
580 // replace the local echo by the remote echo, which might have more data, like
581 // the raw JSON.
582 let related_events = self.related_events.entry(related_to).or_default();
583 if let Some(pos) = related_events.iter().position(|agg| agg.own_id == aggregation.own_id) {
584 related_events.remove(pos);
585 }
586 related_events.push(aggregation);
587 }
588
589 /// Is the given id one for a known aggregation to another event?
590 ///
591 /// If so, unapplies it by replacing the corresponding related item, if
592 /// needs be.
593 ///
594 /// Returns true if an aggregation was found. This doesn't mean
595 /// the underlying item has been updated, if it was missing from the
596 /// timeline for instance.
597 ///
598 /// May return an error if it found an aggregation, but it couldn't be
599 /// properly applied.
600 pub fn try_remove_aggregation(
601 &mut self,
602 aggregation_id: &TimelineEventItemId,
603 items: &mut ObservableItemsTransaction<'_>,
604 ) -> Result<bool, AggregationError> {
605 let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
606
607 // Find and remove the aggregation in the other mapping.
608 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
609 let removed = aggregations
610 .iter()
611 .position(|agg| agg.own_id == *aggregation_id)
612 .map(|idx| aggregations.remove(idx));
613
614 // If this was the last aggregation, remove the entry in the `related_events`
615 // mapping.
616 if aggregations.is_empty() {
617 self.related_events.remove(found);
618 }
619
620 removed
621 } else {
622 None
623 };
624
625 let Some(aggregation) = aggregation else {
626 warn!(
627 "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
628 not in related-to map."
629 );
630 return Ok(false);
631 };
632
633 if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
634 let mut cowed = Cow::Borrowed(&*item);
635 match aggregation.unapply(&mut cowed) {
636 ApplyAggregationResult::UpdatedItem => {
637 trace!("removed aggregation");
638 items.replace(
639 item_pos,
640 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
641 );
642 }
643 ApplyAggregationResult::LeftItemIntact => {}
644 ApplyAggregationResult::Error(err) => {
645 warn!("error when unapplying aggregation: {err}");
646 }
647 ApplyAggregationResult::Edit => {
648 // This edit has been removed; try to find another that still applies.
649 if let Some(aggregations) = self.related_events.get(found) {
650 if resolve_edits(aggregations, items, &mut cowed) {
651 items.replace(
652 item_pos,
653 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
654 );
655 } else {
656 // No other edit was found, leave the item as is.
657 // TODO likely need to change the item to indicate
658 // it's been un-edited etc.
659 }
660 } else {
661 // No other edits apply.
662 }
663 }
664 }
665 } else {
666 info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
667 }
668
669 Ok(true)
670 }
671
672 /// Apply all the aggregations to a [`TimelineItemContent`].
673 ///
674 /// If `sender` is provided alongside a remote `item_id`, any
675 /// [`AggregationKind::BeaconStop`] events that arrived out-of-order (i.e.
676 /// before the live `beacon_info` start item) are first promoted from the
677 /// pending-stops stash into the regular aggregation map so they are picked
678 /// up here together with every other pending aggregation for this item.
679 ///
680 /// Will return an error at the first aggregation that couldn't be applied;
681 /// see [`Aggregation::apply`] which explains under which conditions it can
682 /// happen.
683 pub fn apply_all(
684 &mut self,
685 item_id: &TimelineEventItemId,
686 sender: &OwnedUserId,
687 event: &mut Cow<'_, EventTimelineItem>,
688 items: &mut ObservableItemsTransaction<'_>,
689 rules: &RoomVersionRules,
690 ) -> Result<(), AggregationError> {
691 // If a beacon-stop arrived before this live start item, it was stashed
692 // in `pending_beacon_stops` keyed by sender. Promote it into
693 // `related_events` under the now-known start event ID so the loop below
694 // applies it together with any other pending aggregations.
695 //
696 // The promotion verifies that the pending stop's content matches the
697 // start event's content to ensure we don't apply an old stop to a new
698 // session.
699 if let TimelineEventItemId::EventId(event_id) = item_id
700 && let Some(live_location) = event.content().as_live_location_state()
701 {
702 self.promote_pending_beacon_stop(sender, event_id.clone(), &live_location.beacon_info);
703 }
704
705 let Some(aggregations) = self.related_events.get(item_id) else {
706 return Ok(());
707 };
708
709 let mut has_edits = false;
710
711 for a in aggregations {
712 match a.apply(event, rules) {
713 ApplyAggregationResult::Edit => {
714 has_edits = true;
715 }
716 ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
717 ApplyAggregationResult::Error(err) => return Err(err),
718 }
719 }
720
721 if has_edits {
722 resolve_edits(aggregations, items, event);
723 }
724
725 Ok(())
726 }
727
728 /// Mark a target event as being sent (i.e. it transitions from an local
729 /// transaction id to its remote event id counterpart), by updating the
730 /// internal mappings.
731 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
732 let from = TimelineEventItemId::TransactionId(txn_id);
733 let to = TimelineEventItemId::EventId(event_id);
734
735 // Update the aggregations in the `related_events` field.
736 if let Some(aggregations) = self.related_events.remove(&from) {
737 // Update the inverted mappings (from aggregation's id, to the new target id).
738 for a in &aggregations {
739 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
740 debug_assert_eq!(prev_target, from);
741 self.inverted_map.insert(a.own_id.clone(), to.clone());
742 }
743 }
744 // Update the direct mapping of target -> aggregations.
745 self.related_events.entry(to).or_default().extend(aggregations);
746 }
747 }
748
749 /// Mark an aggregation event as being sent (i.e. it transitions from an
750 /// local transaction id to its remote event id counterpart), by
751 /// updating the internal mappings.
752 ///
753 /// When an aggregation has been marked as sent, it may need to be reapplied
754 /// to the corresponding [`TimelineItemContent`]; this is why we're also
755 /// passing the context to apply an aggregation here.
756 pub fn mark_aggregation_as_sent(
757 &mut self,
758 txn_id: OwnedTransactionId,
759 event_id: OwnedEventId,
760 items: &mut ObservableItemsTransaction<'_>,
761 rules: &RoomVersionRules,
762 ) -> bool {
763 let from = TimelineEventItemId::TransactionId(txn_id);
764 let to = TimelineEventItemId::EventId(event_id.clone());
765
766 let Some(target) = self.inverted_map.remove(&from) else {
767 return false;
768 };
769
770 if let Some(aggregations) = self.related_events.get_mut(&target)
771 && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
772 {
773 found.own_id = to.clone();
774
775 match &mut found.kind {
776 AggregationKind::PollResponse { .. }
777 | AggregationKind::PollEnd { .. }
778 | AggregationKind::Edit(..)
779 | AggregationKind::BeaconUpdate { .. }
780 | AggregationKind::BeaconStop { .. }
781 | AggregationKind::CallDeclined { .. } => {
782 // Nothing particular to do.
783 }
784
785 AggregationKind::Redaction { is_local } => {
786 // Mark the redaction as being remote and apply it (irreversibly).
787 *is_local = false;
788
789 let found = found.clone();
790 find_item_and_apply_aggregation(self, items, &target, found, rules);
791 }
792
793 AggregationKind::Reaction { reaction_status, .. } => {
794 // Mark the reaction as becoming remote, and signal that update to the
795 // caller.
796 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
797
798 let found = found.clone();
799 find_item_and_apply_aggregation(self, items, &target, found, rules);
800 }
801 }
802 }
803
804 self.inverted_map.insert(to, target);
805 true
806 }
807
808 /// Returns the id of the event this aggregation relates to, if it's a known
809 /// aggregation.
810 pub fn is_aggregation_of(&self, item: &TimelineEventItemId) -> Option<&TimelineEventItemId> {
811 self.inverted_map.get(item)
812 }
813}
814
815/// Look at all the edits of a given event, and apply the most recent one, if
816/// found.
817///
818/// Returns true if an edit was found and applied, false otherwise.
819fn resolve_edits(
820 aggregations: &[Aggregation],
821 items: &ObservableItemsTransaction<'_>,
822 event: &mut Cow<'_, EventTimelineItem>,
823) -> bool {
824 // A tuple of the best edit, if we have found one and a boolean indicating if
825 // the edit is coming from a local echo. If it's from a local echo, we can't
826 // validate it as we don't have a raw JSON, but this isn't that important as
827 // we're sure we won't send ourselves invalid edits.
828 let mut best_edit: Option<(PendingEdit, bool)> = None;
829 let mut best_edit_pos = None;
830
831 for a in aggregations {
832 if let AggregationKind::Edit(pending_edit) = &a.kind {
833 match &a.own_id {
834 TimelineEventItemId::TransactionId(_) => {
835 // A local echo is always the most recent edit: use this one.
836 best_edit = Some((pending_edit.clone(), true));
837 break;
838 }
839
840 TimelineEventItemId::EventId(event_id) => {
841 if let Some(best_edit_pos) = &mut best_edit_pos {
842 // Find the position of the timeline owning the edit: either the bundled
843 // item owner if this was a bundled edit, or the edit event itself.
844 let pos = items.position_by_event_id(
845 pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
846 );
847
848 if let Some(pos) = pos {
849 // If the edit is more recent (higher index) than the previous best
850 // edit we knew about, use this one.
851 if pos > *best_edit_pos {
852 best_edit = Some((pending_edit.clone(), false));
853 *best_edit_pos = pos;
854 trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
855 }
856 } else {
857 trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
858
859 // The edit event isn't in the timeline, so it might be a bundled
860 // edit. In this case, record it as the best edit if and only if
861 // there wasn't any other.
862 if best_edit.is_none() {
863 best_edit = Some((pending_edit.clone(), false));
864 trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
865 }
866 }
867 } else {
868 // There wasn't any best edit yet, so record this one as being it, with
869 // its position.
870 best_edit = Some((pending_edit.clone(), false));
871 best_edit_pos = items.position_by_event_id(event_id);
872 trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
873 }
874 }
875 }
876 }
877 }
878
879 if let Some((edit, is_local_echo)) = best_edit {
880 edit_item(event, edit, is_local_echo)
881 } else {
882 false
883 }
884}
885
886/// Apply the selected edit to the given EventTimelineItem.
887///
888/// Returns true if the edit was applied, false otherwise (because the edit and
889/// original timeline item types didn't match, for instance).
890fn edit_item(
891 item: &mut Cow<'_, EventTimelineItem>,
892 edit: PendingEdit,
893 is_local_echo: bool,
894) -> bool {
895 // We can receive edits from a local echo, i.e. the edit wasn't yet received
896 // from the homeserver.
897 //
898 // Before we send an edit we check that the event is allowed to be edited and
899 // that the replacement content is allowed.
900 //
901 // We don't have yet a full JSON of the event, so we can't do the validation
902 // here.
903 if !is_local_echo {
904 let Some(original_json) = item.original_json() else {
905 error!("The original event does not have the JSON field set.");
906 return false;
907 };
908
909 let Some(edit_json) = &edit.edit_json else {
910 error!(
911 "The replacement event of a remotely received edit does not have the JSON field set."
912 );
913 return false;
914 };
915
916 match check_validity_of_replacement_events(
917 original_json,
918 item.encryption_info(),
919 edit_json,
920 edit.encryption_info.as_deref(),
921 ) {
922 Ok(content) => content,
923 Err(e) => {
924 warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
925 return false;
926 }
927 }
928 }
929
930 let TimelineItemContent::MsgLike(content) = item.content() else {
931 info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
932 return false;
933 };
934
935 let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
936
937 match (edit_kind, content) {
938 (
939 PendingEditKind::RoomMessage(replacement),
940 MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
941 ) => {
942 // First combination: it's a message edit for a message. Good.
943 let mut new_msg = msg.clone();
944 new_msg.apply_edit(replacement.new_content);
945
946 let new_item = item.with_content_and_latest_edit(
947 TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
948 edit_json,
949 );
950 *item = Cow::Owned(new_item);
951 }
952
953 (
954 PendingEditKind::Poll(replacement),
955 MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
956 ) => {
957 // Second combination: it's a poll edit for a poll. Good.
958 if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
959 let new_item = item.with_content_and_latest_edit(
960 TimelineItemContent::MsgLike(
961 content.with_kind(MsgLikeKind::Poll(new_poll_state)),
962 ),
963 edit_json,
964 );
965 *item = Cow::Owned(new_item);
966 } else {
967 // The poll has ended, so we can't edit it anymore.
968 return false;
969 }
970 }
971
972 (edit_kind, _) => {
973 // Invalid combination.
974 info!(
975 content = item.content().debug_string(),
976 edit = format!("{:?}", edit_kind),
977 "Mismatch between edit type and content type",
978 );
979 return false;
980 }
981 }
982
983 if let Some(encryption_info) = encryption_info {
984 *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
985 }
986
987 true
988}
989
990/// Find an item identified by the target identifier, and apply the aggregation
991/// onto it.
992///
993/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
994/// `None` otherwise.
995pub(crate) fn find_item_and_apply_aggregation(
996 aggregations: &Aggregations,
997 items: &mut ObservableItemsTransaction<'_>,
998 target: &TimelineEventItemId,
999 aggregation: Aggregation,
1000 rules: &RoomVersionRules,
1001) -> Option<EventTimelineItem> {
1002 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
1003 trace!("couldn't find aggregation's target {target:?}");
1004 return None;
1005 };
1006
1007 let mut cowed = Cow::Borrowed(&*event_item);
1008 match aggregation.apply(&mut cowed, rules) {
1009 ApplyAggregationResult::UpdatedItem => {
1010 trace!("applied aggregation");
1011 let new_event_item = cowed.into_owned();
1012 let new_item =
1013 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
1014 items.replace(idx, new_item);
1015 Some(new_event_item)
1016 }
1017 ApplyAggregationResult::Edit => {
1018 if let Some(aggregations) = aggregations.related_events.get(target)
1019 && resolve_edits(aggregations, items, &mut cowed)
1020 {
1021 let new_event_item = cowed.into_owned();
1022 let new_item =
1023 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
1024 items.replace(idx, new_item);
1025 return Some(new_event_item);
1026 }
1027 None
1028 }
1029 ApplyAggregationResult::LeftItemIntact => {
1030 trace!("applying the aggregation had no effect");
1031 None
1032 }
1033 ApplyAggregationResult::Error(err) => {
1034 warn!("error when applying aggregation: {err}");
1035 None
1036 }
1037 }
1038}
1039
1040/// The result of applying (or unapplying) an aggregation onto a timeline item.
1041enum ApplyAggregationResult {
1042 /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
1043 UpdatedItem,
1044
1045 /// An edit must be included in the edit set and resolved later, using the
1046 /// relative position of the edits.
1047 Edit,
1048
1049 /// The item hasn't been modified after applying the aggregation, because it
1050 /// was likely already applied prior to this.
1051 LeftItemIntact,
1052
1053 /// An error happened while applying the aggregation.
1054 Error(AggregationError),
1055}
1056
1057#[derive(Debug, thiserror::Error)]
1058pub(crate) enum AggregationError {
1059 #[error("trying to end a poll twice")]
1060 PollAlreadyEnded,
1061
1062 #[error("a poll end can't be unapplied")]
1063 CantUndoPollEnd,
1064
1065 #[error("a redaction can't be unapplied")]
1066 CantUndoRedaction,
1067
1068 #[error("a beacon stop can't be unapplied")]
1069 CantUndoBeaconStop,
1070
1071 #[error("a call decline can't be unapplied")]
1072 CantUndoRtcDecline,
1073
1074 #[error(
1075 "trying to apply an aggregation of one type to an invalid target: \
1076 expected {expected}, actual {actual}"
1077 )]
1078 InvalidType { expected: String, actual: String },
1079}