matrix_sdk_ui/timeline/controller/
aggregations.rs1use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::deserialized_responses::EncryptionInfo;
44use ruma::{
45 events::{
46 poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
47 relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
48 AnySyncTimelineEvent,
49 },
50 serde::Raw,
51 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
52};
53use tracing::{info, trace, warn};
54
55use super::{rfind_event_by_item_id, ObservableItemsTransaction};
56use crate::timeline::{
57 EventTimelineItem, MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus,
58 TimelineEventItemId, TimelineItem, TimelineItemContent,
59};
60
61#[derive(Clone)]
62pub(in crate::timeline) enum PendingEditKind {
63 RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
64 Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
65}
66
67impl std::fmt::Debug for PendingEditKind {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 match self {
70 Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
71 Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
72 }
73 }
74}
75
76#[derive(Clone, Debug)]
77pub(in crate::timeline) struct PendingEdit {
78 pub kind: PendingEditKind,
80
81 pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
83
84 pub encryption_info: Option<Arc<EncryptionInfo>>,
86}
87
88#[derive(Clone, Debug)]
90pub(crate) enum AggregationKind {
91 PollResponse {
93 sender: OwnedUserId,
95 timestamp: MilliSecondsSinceUnixEpoch,
97 answers: Vec<String>,
99 },
100
101 PollEnd {
103 end_date: MilliSecondsSinceUnixEpoch,
108 },
109
110 Reaction {
112 key: String,
114 sender: OwnedUserId,
116 timestamp: MilliSecondsSinceUnixEpoch,
118 reaction_status: ReactionStatus,
121 },
122
123 Redaction,
125
126 Edit(PendingEdit),
134}
135
136#[derive(Clone, Debug)]
141pub(crate) struct Aggregation {
142 pub kind: AggregationKind,
144
145 pub own_id: TimelineEventItemId,
151}
152
153fn poll_state_from_item<'a>(
155 event: &'a mut Cow<'_, EventTimelineItem>,
156) -> Result<&'a mut PollState, AggregationError> {
157 if event.content().is_poll() {
158 let state = as_variant!(
160 event.to_mut().content_mut(),
161 TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
162 )
163 .expect("it was a poll just above");
164 Ok(state)
165 } else {
166 Err(AggregationError::InvalidType {
167 expected: "a poll".to_owned(),
168 actual: event.content().debug_string().to_owned(),
169 })
170 }
171}
172
173impl Aggregation {
174 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
176 Self { kind, own_id }
177 }
178
179 fn apply(
188 &self,
189 event: &mut Cow<'_, EventTimelineItem>,
190 room_version: &RoomVersionId,
191 ) -> ApplyAggregationResult {
192 match &self.kind {
193 AggregationKind::PollResponse { sender, timestamp, answers } => {
194 match poll_state_from_item(event) {
195 Ok(state) => {
196 state.add_response(sender.clone(), *timestamp, answers.clone());
197 ApplyAggregationResult::UpdatedItem
198 }
199 Err(err) => ApplyAggregationResult::Error(err),
200 }
201 }
202
203 AggregationKind::Redaction => {
204 if event.content().is_redacted() {
205 ApplyAggregationResult::LeftItemIntact
206 } else {
207 let new_item = event.redact(room_version);
208 *event = Cow::Owned(new_item);
209 ApplyAggregationResult::UpdatedItem
210 }
211 }
212
213 AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
214 Ok(state) => {
215 if !state.end(*end_date) {
216 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
217 }
218 ApplyAggregationResult::UpdatedItem
219 }
220 Err(err) => ApplyAggregationResult::Error(err),
221 },
222
223 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
224 let Some(reactions) = event.content().reactions() else {
225 return ApplyAggregationResult::LeftItemIntact;
227 };
228
229 let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
230
231 let is_same = previous_reaction.is_some_and(|prev| {
236 prev.timestamp == *timestamp
237 && matches!(
238 (&prev.status, reaction_status),
239 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
240 | (
241 ReactionStatus::LocalToRemote(_),
242 ReactionStatus::LocalToRemote(_),
243 )
244 | (
245 ReactionStatus::RemoteToRemote(_),
246 ReactionStatus::RemoteToRemote(_),
247 )
248 )
249 });
250
251 if is_same {
252 ApplyAggregationResult::LeftItemIntact
253 } else {
254 let reactions = event
255 .to_mut()
256 .content_mut()
257 .reactions_mut()
258 .expect("reactions was Some above");
259
260 reactions.entry(key.clone()).or_default().insert(
261 sender.clone(),
262 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
263 );
264
265 ApplyAggregationResult::UpdatedItem
266 }
267 }
268
269 AggregationKind::Edit(_) => {
270 ApplyAggregationResult::Edit
272 }
273 }
274 }
275
276 fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
285 match &self.kind {
286 AggregationKind::PollResponse { sender, timestamp, .. } => {
287 let state = match poll_state_from_item(event) {
288 Ok(state) => state,
289 Err(err) => return ApplyAggregationResult::Error(err),
290 };
291 state.remove_response(sender, *timestamp);
292 ApplyAggregationResult::UpdatedItem
293 }
294
295 AggregationKind::PollEnd { .. } => {
296 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
298 }
299
300 AggregationKind::Redaction => {
301 ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
303 }
304
305 AggregationKind::Reaction { key, sender, .. } => {
306 let Some(reactions) = event.content().reactions() else {
307 return ApplyAggregationResult::LeftItemIntact;
309 };
310
311 let had_entry =
316 reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
317
318 if had_entry {
319 let reactions = event
320 .to_mut()
321 .content_mut()
322 .reactions_mut()
323 .expect("reactions was some above");
324 let by_user = reactions.get_mut(key);
325 if let Some(by_user) = by_user {
326 by_user.swap_remove(sender);
327 if by_user.is_empty() {
329 reactions.swap_remove(key);
330 }
331 }
332 ApplyAggregationResult::UpdatedItem
333 } else {
334 ApplyAggregationResult::LeftItemIntact
335 }
336 }
337
338 AggregationKind::Edit(_) => {
339 ApplyAggregationResult::Edit
341 }
342 }
343 }
344}
345
346#[derive(Clone, Debug, Default)]
348pub(crate) struct Aggregations {
349 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
351
352 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
354}
355
356impl Aggregations {
357 pub fn clear(&mut self) {
359 self.related_events.clear();
360 self.inverted_map.clear();
361 }
362
363 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
366 if matches!(aggregation.kind, AggregationKind::Redaction) {
369 for agg in self.related_events.remove(&related_to).unwrap_or_default() {
370 self.inverted_map.remove(&agg.own_id);
371 }
372 }
373
374 if let Some(previous_aggregations) = self.related_events.get(&related_to) {
377 if previous_aggregations
378 .iter()
379 .any(|agg| matches!(agg.kind, AggregationKind::Redaction))
380 {
381 return;
382 }
383 }
384
385 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
386 self.related_events.entry(related_to).or_default().push(aggregation);
387 }
388
389 pub fn try_remove_aggregation(
401 &mut self,
402 aggregation_id: &TimelineEventItemId,
403 items: &mut ObservableItemsTransaction<'_>,
404 ) -> Result<bool, AggregationError> {
405 let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
406
407 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
409 let removed = aggregations
410 .iter()
411 .position(|agg| agg.own_id == *aggregation_id)
412 .map(|idx| aggregations.remove(idx));
413
414 if aggregations.is_empty() {
417 self.related_events.remove(found);
418 }
419
420 removed
421 } else {
422 None
423 };
424
425 let Some(aggregation) = aggregation else {
426 warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
427 return Ok(false);
428 };
429
430 if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
431 let mut cowed = Cow::Borrowed(&*item);
432 match aggregation.unapply(&mut cowed) {
433 ApplyAggregationResult::UpdatedItem => {
434 trace!("removed aggregation");
435 items.replace(
436 item_pos,
437 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
438 );
439 }
440 ApplyAggregationResult::LeftItemIntact => {}
441 ApplyAggregationResult::Error(err) => {
442 warn!("error when unapplying aggregation: {err}");
443 }
444 ApplyAggregationResult::Edit => {
445 if let Some(aggregations) = self.related_events.get(found) {
447 if resolve_edits(aggregations, items, &mut cowed) {
448 items.replace(
449 item_pos,
450 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
451 );
452 } else {
453 }
457 } else {
458 }
460 }
461 }
462 } else {
463 info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
464 }
465
466 Ok(true)
467 }
468
469 pub fn apply_all(
478 &self,
479 item_id: &TimelineEventItemId,
480 event: &mut Cow<'_, EventTimelineItem>,
481 items: &mut ObservableItemsTransaction<'_>,
482 room_version: &RoomVersionId,
483 ) -> Result<(), AggregationError> {
484 let Some(aggregations) = self.related_events.get(item_id) else {
485 return Ok(());
486 };
487
488 let mut has_edits = false;
489
490 for a in aggregations {
491 match a.apply(event, room_version) {
492 ApplyAggregationResult::Edit => {
493 has_edits = true;
494 }
495 ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
496 ApplyAggregationResult::Error(err) => return Err(err),
497 }
498 }
499
500 if has_edits {
501 resolve_edits(aggregations, items, event);
502 }
503
504 Ok(())
505 }
506
507 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
511 let from = TimelineEventItemId::TransactionId(txn_id);
512 let to = TimelineEventItemId::EventId(event_id);
513
514 if let Some(aggregations) = self.related_events.remove(&from) {
516 for a in &aggregations {
518 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
519 debug_assert_eq!(prev_target, from);
520 self.inverted_map.insert(a.own_id.clone(), to.clone());
521 }
522 }
523 self.related_events.entry(to).or_default().extend(aggregations);
525 }
526 }
527
528 pub fn mark_aggregation_as_sent(
536 &mut self,
537 txn_id: OwnedTransactionId,
538 event_id: OwnedEventId,
539 items: &mut ObservableItemsTransaction<'_>,
540 room_version: &RoomVersionId,
541 ) -> bool {
542 let from = TimelineEventItemId::TransactionId(txn_id);
543 let to = TimelineEventItemId::EventId(event_id.clone());
544
545 let Some(target) = self.inverted_map.remove(&from) else {
546 return false;
547 };
548
549 if let Some(aggregations) = self.related_events.get_mut(&target) {
550 if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
551 found.own_id = to.clone();
552
553 match &mut found.kind {
554 AggregationKind::PollResponse { .. }
555 | AggregationKind::PollEnd { .. }
556 | AggregationKind::Edit(..)
557 | AggregationKind::Redaction => {
558 }
560
561 AggregationKind::Reaction { reaction_status, .. } => {
562 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
565
566 let found = found.clone();
567 find_item_and_apply_aggregation(self, items, &target, found, room_version);
568 }
569 }
570 }
571 }
572
573 self.inverted_map.insert(to, target);
574 true
575 }
576}
577
578fn resolve_edits(
583 aggregations: &[Aggregation],
584 items: &ObservableItemsTransaction<'_>,
585 event: &mut Cow<'_, EventTimelineItem>,
586) -> bool {
587 let mut best_edit: Option<PendingEdit> = None;
588 let mut best_edit_pos = None;
589
590 for a in aggregations {
591 if let AggregationKind::Edit(pending_edit) = &a.kind {
592 match &a.own_id {
593 TimelineEventItemId::TransactionId(_) => {
594 best_edit = Some(pending_edit.clone());
596 break;
597 }
598
599 TimelineEventItemId::EventId(event_id) => {
600 if let Some(best_edit_pos) = &mut best_edit_pos {
601 let pos = items.position_by_event_id(event_id);
602 if let Some(pos) = pos {
603 if pos > *best_edit_pos {
606 best_edit = Some(pending_edit.clone());
607 *best_edit_pos = pos;
608 trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
609 }
610 } else {
611 trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
612
613 if best_edit.is_none() {
617 best_edit = Some(pending_edit.clone());
618 trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
619 }
620 }
621 } else {
622 best_edit = Some(pending_edit.clone());
625 best_edit_pos = items.position_by_event_id(event_id);
626 trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
627 }
628 }
629 }
630 }
631 }
632
633 if let Some(edit) = best_edit {
634 edit_item(event, edit)
635 } else {
636 false
637 }
638}
639
640fn edit_item(item: &mut Cow<'_, EventTimelineItem>, edit: PendingEdit) -> bool {
642 let PendingEdit { kind: edit_kind, edit_json, encryption_info } = edit;
643
644 if let Some(event_json) = &edit_json {
645 let Some(edit_sender) = event_json.get_field::<OwnedUserId>("sender").ok().flatten() else {
646 info!("edit event didn't have a sender; likely a malformed event");
647 return false;
648 };
649
650 if edit_sender != item.sender() {
651 info!(
652 original_sender = %item.sender(),
653 %edit_sender,
654 "Edit event applies to another user's timeline item, discarding"
655 );
656 return false;
657 }
658 }
659
660 let TimelineItemContent::MsgLike(content) = item.content() else {
661 info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
662 return false;
663 };
664
665 match (edit_kind, content) {
666 (
667 PendingEditKind::RoomMessage(replacement),
668 MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
669 ) => {
670 let mut new_msg = msg.clone();
672 new_msg.apply_edit(replacement.new_content);
673
674 let new_item = item.with_content_and_latest_edit(
675 TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
676 edit_json,
677 );
678 *item = Cow::Owned(new_item);
679 }
680
681 (
682 PendingEditKind::Poll(replacement),
683 MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
684 ) => {
685 if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
687 let new_item = item.with_content_and_latest_edit(
688 TimelineItemContent::MsgLike(
689 content.with_kind(MsgLikeKind::Poll(new_poll_state)),
690 ),
691 edit_json,
692 );
693 *item = Cow::Owned(new_item);
694 } else {
695 return false;
697 }
698 }
699
700 (edit_kind, _) => {
701 info!(
703 content = item.content().debug_string(),
704 edit = format!("{:?}", edit_kind),
705 "Mismatch between edit type and content type",
706 );
707 return false;
708 }
709 }
710
711 if let Some(encryption_info) = encryption_info {
712 *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
713 }
714
715 true
716}
717
718pub(crate) fn find_item_and_apply_aggregation(
724 aggregations: &Aggregations,
725 items: &mut ObservableItemsTransaction<'_>,
726 target: &TimelineEventItemId,
727 aggregation: Aggregation,
728 room_version: &RoomVersionId,
729) -> Option<EventTimelineItem> {
730 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
731 trace!("couldn't find aggregation's target {target:?}");
732 return None;
733 };
734
735 let mut cowed = Cow::Borrowed(&*event_item);
736 match aggregation.apply(&mut cowed, room_version) {
737 ApplyAggregationResult::UpdatedItem => {
738 trace!("applied aggregation");
739 let new_event_item = cowed.into_owned();
740 let new_item =
741 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
742 items.replace(idx, new_item);
743 Some(new_event_item)
744 }
745 ApplyAggregationResult::Edit => {
746 if let Some(aggregations) = aggregations.related_events.get(target) {
747 if resolve_edits(aggregations, items, &mut cowed) {
748 let new_event_item = cowed.into_owned();
749 let new_item = TimelineItem::new(
750 new_event_item.clone(),
751 event_item.internal_id.to_owned(),
752 );
753 items.replace(idx, new_item);
754 return Some(new_event_item);
755 }
756 }
757 None
758 }
759 ApplyAggregationResult::LeftItemIntact => {
760 trace!("applying the aggregation had no effect");
761 None
762 }
763 ApplyAggregationResult::Error(err) => {
764 warn!("error when applying aggregation: {err}");
765 None
766 }
767 }
768}
769
770enum ApplyAggregationResult {
772 UpdatedItem,
774
775 Edit,
778
779 LeftItemIntact,
782
783 Error(AggregationError),
785}
786
787#[derive(Debug, thiserror::Error)]
788pub(crate) enum AggregationError {
789 #[error("trying to end a poll twice")]
790 PollAlreadyEnded,
791
792 #[error("a poll end can't be unapplied")]
793 CantUndoPollEnd,
794
795 #[error("a redaction can't be unapplied")]
796 CantUndoRedaction,
797
798 #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
799 InvalidType { expected: String, actual: String },
800}