matrix_sdk_ui/timeline/controller/
aggregations.rs1use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::deserialized_responses::EncryptionInfo;
44use ruma::{
45 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
46 events::{
47 AnySyncTimelineEvent,
48 poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
49 relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
50 },
51 room_version_rules::RoomVersionRules,
52 serde::Raw,
53};
54use tracing::{info, trace, warn};
55
56use super::{ObservableItemsTransaction, rfind_event_by_item_id};
57use crate::timeline::{
58 EventTimelineItem, MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus,
59 TimelineEventItemId, TimelineItem, TimelineItemContent,
60};
61
62#[derive(Clone)]
63pub(in crate::timeline) enum PendingEditKind {
64 RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
65 Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
66}
67
68impl std::fmt::Debug for PendingEditKind {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
72 Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
73 }
74 }
75}
76
77#[derive(Clone, Debug)]
78pub(in crate::timeline) struct PendingEdit {
79 pub kind: PendingEditKind,
81
82 pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
84
85 pub encryption_info: Option<Arc<EncryptionInfo>>,
87
88 pub bundled_item_owner: Option<OwnedEventId>,
91}
92
93#[derive(Clone, Debug)]
95pub(crate) enum AggregationKind {
96 PollResponse {
98 sender: OwnedUserId,
100 timestamp: MilliSecondsSinceUnixEpoch,
102 answers: Vec<String>,
104 },
105
106 PollEnd {
108 end_date: MilliSecondsSinceUnixEpoch,
113 },
114
115 Reaction {
117 key: String,
119 sender: OwnedUserId,
121 timestamp: MilliSecondsSinceUnixEpoch,
123 reaction_status: ReactionStatus,
126 },
127
128 Redaction,
130
131 Edit(PendingEdit),
139}
140
141#[derive(Clone, Debug)]
146pub(crate) struct Aggregation {
147 pub kind: AggregationKind,
149
150 pub own_id: TimelineEventItemId,
156}
157
158fn poll_state_from_item<'a>(
160 event: &'a mut Cow<'_, EventTimelineItem>,
161) -> Result<&'a mut PollState, AggregationError> {
162 if event.content().is_poll() {
163 let state = as_variant!(
165 event.to_mut().content_mut(),
166 TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
167 )
168 .expect("it was a poll just above");
169 Ok(state)
170 } else {
171 Err(AggregationError::InvalidType {
172 expected: "a poll".to_owned(),
173 actual: event.content().debug_string().to_owned(),
174 })
175 }
176}
177
178impl Aggregation {
179 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
181 Self { kind, own_id }
182 }
183
184 fn apply(
193 &self,
194 event: &mut Cow<'_, EventTimelineItem>,
195 rules: &RoomVersionRules,
196 ) -> ApplyAggregationResult {
197 match &self.kind {
198 AggregationKind::PollResponse { sender, timestamp, answers } => {
199 match poll_state_from_item(event) {
200 Ok(state) => {
201 state.add_response(sender.clone(), *timestamp, answers.clone());
202 ApplyAggregationResult::UpdatedItem
203 }
204 Err(err) => ApplyAggregationResult::Error(err),
205 }
206 }
207
208 AggregationKind::Redaction => {
209 if event.content().is_redacted() {
210 ApplyAggregationResult::LeftItemIntact
211 } else {
212 let new_item = event.redact(&rules.redaction);
213 *event = Cow::Owned(new_item);
214 ApplyAggregationResult::UpdatedItem
215 }
216 }
217
218 AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
219 Ok(state) => {
220 if !state.end(*end_date) {
221 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
222 }
223 ApplyAggregationResult::UpdatedItem
224 }
225 Err(err) => ApplyAggregationResult::Error(err),
226 },
227
228 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
229 let Some(reactions) = event.content().reactions() else {
230 return ApplyAggregationResult::LeftItemIntact;
232 };
233
234 let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
235
236 let is_same = previous_reaction.is_some_and(|prev| {
241 prev.timestamp == *timestamp
242 && matches!(
243 (&prev.status, reaction_status),
244 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
245 | (
246 ReactionStatus::LocalToRemote(_),
247 ReactionStatus::LocalToRemote(_),
248 )
249 | (
250 ReactionStatus::RemoteToRemote(_),
251 ReactionStatus::RemoteToRemote(_),
252 )
253 )
254 });
255
256 if is_same {
257 ApplyAggregationResult::LeftItemIntact
258 } else {
259 let reactions = event
260 .to_mut()
261 .content_mut()
262 .reactions_mut()
263 .expect("reactions was Some above");
264
265 reactions.entry(key.clone()).or_default().insert(
266 sender.clone(),
267 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
268 );
269
270 ApplyAggregationResult::UpdatedItem
271 }
272 }
273
274 AggregationKind::Edit(_) => {
275 ApplyAggregationResult::Edit
277 }
278 }
279 }
280
281 fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
290 match &self.kind {
291 AggregationKind::PollResponse { sender, timestamp, .. } => {
292 let state = match poll_state_from_item(event) {
293 Ok(state) => state,
294 Err(err) => return ApplyAggregationResult::Error(err),
295 };
296 state.remove_response(sender, *timestamp);
297 ApplyAggregationResult::UpdatedItem
298 }
299
300 AggregationKind::PollEnd { .. } => {
301 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
303 }
304
305 AggregationKind::Redaction => {
306 ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
308 }
309
310 AggregationKind::Reaction { key, sender, .. } => {
311 let Some(reactions) = event.content().reactions() else {
312 return ApplyAggregationResult::LeftItemIntact;
314 };
315
316 let had_entry =
321 reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
322
323 if had_entry {
324 let reactions = event
325 .to_mut()
326 .content_mut()
327 .reactions_mut()
328 .expect("reactions was some above");
329 let by_user = reactions.get_mut(key);
330 if let Some(by_user) = by_user {
331 by_user.swap_remove(sender);
332 if by_user.is_empty() {
334 reactions.swap_remove(key);
335 }
336 }
337 ApplyAggregationResult::UpdatedItem
338 } else {
339 ApplyAggregationResult::LeftItemIntact
340 }
341 }
342
343 AggregationKind::Edit(_) => {
344 ApplyAggregationResult::Edit
346 }
347 }
348 }
349}
350
351#[derive(Clone, Debug, Default)]
353pub(crate) struct Aggregations {
354 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
356
357 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
359}
360
361impl Aggregations {
362 pub fn clear(&mut self) {
364 self.related_events.clear();
365 self.inverted_map.clear();
366 }
367
368 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
371 if matches!(aggregation.kind, AggregationKind::Redaction) {
374 for agg in self.related_events.remove(&related_to).unwrap_or_default() {
375 self.inverted_map.remove(&agg.own_id);
376 }
377 }
378
379 if let Some(previous_aggregations) = self.related_events.get(&related_to)
382 && previous_aggregations
383 .iter()
384 .any(|agg| matches!(agg.kind, AggregationKind::Redaction))
385 {
386 return;
387 }
388
389 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
390 self.related_events.entry(related_to).or_default().push(aggregation);
391 }
392
393 pub fn try_remove_aggregation(
405 &mut self,
406 aggregation_id: &TimelineEventItemId,
407 items: &mut ObservableItemsTransaction<'_>,
408 ) -> Result<bool, AggregationError> {
409 let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
410
411 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
413 let removed = aggregations
414 .iter()
415 .position(|agg| agg.own_id == *aggregation_id)
416 .map(|idx| aggregations.remove(idx));
417
418 if aggregations.is_empty() {
421 self.related_events.remove(found);
422 }
423
424 removed
425 } else {
426 None
427 };
428
429 let Some(aggregation) = aggregation else {
430 warn!(
431 "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
432 not in related-to map."
433 );
434 return Ok(false);
435 };
436
437 if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
438 let mut cowed = Cow::Borrowed(&*item);
439 match aggregation.unapply(&mut cowed) {
440 ApplyAggregationResult::UpdatedItem => {
441 trace!("removed aggregation");
442 items.replace(
443 item_pos,
444 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
445 );
446 }
447 ApplyAggregationResult::LeftItemIntact => {}
448 ApplyAggregationResult::Error(err) => {
449 warn!("error when unapplying aggregation: {err}");
450 }
451 ApplyAggregationResult::Edit => {
452 if let Some(aggregations) = self.related_events.get(found) {
454 if resolve_edits(aggregations, items, &mut cowed) {
455 items.replace(
456 item_pos,
457 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
458 );
459 } else {
460 }
464 } else {
465 }
467 }
468 }
469 } else {
470 info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
471 }
472
473 Ok(true)
474 }
475
476 pub fn apply_all(
485 &self,
486 item_id: &TimelineEventItemId,
487 event: &mut Cow<'_, EventTimelineItem>,
488 items: &mut ObservableItemsTransaction<'_>,
489 rules: &RoomVersionRules,
490 ) -> Result<(), AggregationError> {
491 let Some(aggregations) = self.related_events.get(item_id) else {
492 return Ok(());
493 };
494
495 let mut has_edits = false;
496
497 for a in aggregations {
498 match a.apply(event, rules) {
499 ApplyAggregationResult::Edit => {
500 has_edits = true;
501 }
502 ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
503 ApplyAggregationResult::Error(err) => return Err(err),
504 }
505 }
506
507 if has_edits {
508 resolve_edits(aggregations, items, event);
509 }
510
511 Ok(())
512 }
513
514 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
518 let from = TimelineEventItemId::TransactionId(txn_id);
519 let to = TimelineEventItemId::EventId(event_id);
520
521 if let Some(aggregations) = self.related_events.remove(&from) {
523 for a in &aggregations {
525 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
526 debug_assert_eq!(prev_target, from);
527 self.inverted_map.insert(a.own_id.clone(), to.clone());
528 }
529 }
530 self.related_events.entry(to).or_default().extend(aggregations);
532 }
533 }
534
535 pub fn mark_aggregation_as_sent(
543 &mut self,
544 txn_id: OwnedTransactionId,
545 event_id: OwnedEventId,
546 items: &mut ObservableItemsTransaction<'_>,
547 rules: &RoomVersionRules,
548 ) -> bool {
549 let from = TimelineEventItemId::TransactionId(txn_id);
550 let to = TimelineEventItemId::EventId(event_id.clone());
551
552 let Some(target) = self.inverted_map.remove(&from) else {
553 return false;
554 };
555
556 if let Some(aggregations) = self.related_events.get_mut(&target)
557 && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
558 {
559 found.own_id = to.clone();
560
561 match &mut found.kind {
562 AggregationKind::PollResponse { .. }
563 | AggregationKind::PollEnd { .. }
564 | AggregationKind::Edit(..)
565 | AggregationKind::Redaction => {
566 }
568
569 AggregationKind::Reaction { reaction_status, .. } => {
570 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
573
574 let found = found.clone();
575 find_item_and_apply_aggregation(self, items, &target, found, rules);
576 }
577 }
578 }
579
580 self.inverted_map.insert(to, target);
581 true
582 }
583}
584
585fn resolve_edits(
590 aggregations: &[Aggregation],
591 items: &ObservableItemsTransaction<'_>,
592 event: &mut Cow<'_, EventTimelineItem>,
593) -> bool {
594 let mut best_edit: Option<PendingEdit> = None;
595 let mut best_edit_pos = None;
596
597 for a in aggregations {
598 if let AggregationKind::Edit(pending_edit) = &a.kind {
599 match &a.own_id {
600 TimelineEventItemId::TransactionId(_) => {
601 best_edit = Some(pending_edit.clone());
603 break;
604 }
605
606 TimelineEventItemId::EventId(event_id) => {
607 if let Some(best_edit_pos) = &mut best_edit_pos {
608 let pos = items.position_by_event_id(
611 pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
612 );
613
614 if let Some(pos) = pos {
615 if pos > *best_edit_pos {
618 best_edit = Some(pending_edit.clone());
619 *best_edit_pos = pos;
620 trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
621 }
622 } else {
623 trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
624
625 if best_edit.is_none() {
629 best_edit = Some(pending_edit.clone());
630 trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
631 }
632 }
633 } else {
634 best_edit = Some(pending_edit.clone());
637 best_edit_pos = items.position_by_event_id(event_id);
638 trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
639 }
640 }
641 }
642 }
643 }
644
645 if let Some(edit) = best_edit { edit_item(event, edit) } else { false }
646}
647
648fn edit_item(item: &mut Cow<'_, EventTimelineItem>, edit: PendingEdit) -> bool {
653 let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
654
655 if let Some(event_json) = &edit_json {
656 let Some(edit_sender) = event_json.get_field::<OwnedUserId>("sender").ok().flatten() else {
657 info!("edit event didn't have a sender; likely a malformed event");
658 return false;
659 };
660
661 if edit_sender != item.sender() {
662 info!(
663 original_sender = %item.sender(),
664 %edit_sender,
665 "Edit event applies to another user's timeline item, discarding"
666 );
667 return false;
668 }
669 }
670
671 let TimelineItemContent::MsgLike(content) = item.content() else {
672 info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
673 return false;
674 };
675
676 match (edit_kind, content) {
677 (
678 PendingEditKind::RoomMessage(replacement),
679 MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
680 ) => {
681 let mut new_msg = msg.clone();
683 new_msg.apply_edit(replacement.new_content);
684
685 let new_item = item.with_content_and_latest_edit(
686 TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
687 edit_json,
688 );
689 *item = Cow::Owned(new_item);
690 }
691
692 (
693 PendingEditKind::Poll(replacement),
694 MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
695 ) => {
696 if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
698 let new_item = item.with_content_and_latest_edit(
699 TimelineItemContent::MsgLike(
700 content.with_kind(MsgLikeKind::Poll(new_poll_state)),
701 ),
702 edit_json,
703 );
704 *item = Cow::Owned(new_item);
705 } else {
706 return false;
708 }
709 }
710
711 (edit_kind, _) => {
712 info!(
714 content = item.content().debug_string(),
715 edit = format!("{:?}", edit_kind),
716 "Mismatch between edit type and content type",
717 );
718 return false;
719 }
720 }
721
722 if let Some(encryption_info) = encryption_info {
723 *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
724 }
725
726 true
727}
728
729pub(crate) fn find_item_and_apply_aggregation(
735 aggregations: &Aggregations,
736 items: &mut ObservableItemsTransaction<'_>,
737 target: &TimelineEventItemId,
738 aggregation: Aggregation,
739 rules: &RoomVersionRules,
740) -> Option<EventTimelineItem> {
741 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
742 trace!("couldn't find aggregation's target {target:?}");
743 return None;
744 };
745
746 let mut cowed = Cow::Borrowed(&*event_item);
747 match aggregation.apply(&mut cowed, rules) {
748 ApplyAggregationResult::UpdatedItem => {
749 trace!("applied aggregation");
750 let new_event_item = cowed.into_owned();
751 let new_item =
752 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
753 items.replace(idx, new_item);
754 Some(new_event_item)
755 }
756 ApplyAggregationResult::Edit => {
757 if let Some(aggregations) = aggregations.related_events.get(target)
758 && resolve_edits(aggregations, items, &mut cowed)
759 {
760 let new_event_item = cowed.into_owned();
761 let new_item =
762 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
763 items.replace(idx, new_item);
764 return Some(new_event_item);
765 }
766 None
767 }
768 ApplyAggregationResult::LeftItemIntact => {
769 trace!("applying the aggregation had no effect");
770 None
771 }
772 ApplyAggregationResult::Error(err) => {
773 warn!("error when applying aggregation: {err}");
774 None
775 }
776 }
777}
778
779enum ApplyAggregationResult {
781 UpdatedItem,
783
784 Edit,
787
788 LeftItemIntact,
791
792 Error(AggregationError),
794}
795
796#[derive(Debug, thiserror::Error)]
797pub(crate) enum AggregationError {
798 #[error("trying to end a poll twice")]
799 PollAlreadyEnded,
800
801 #[error("a poll end can't be unapplied")]
802 CantUndoPollEnd,
803
804 #[error("a redaction can't be unapplied")]
805 CantUndoRedaction,
806
807 #[error(
808 "trying to apply an aggregation of one type to an invalid target: \
809 expected {expected}, actual {actual}"
810 )]
811 InvalidType { expected: String, actual: String },
812}