use std::{borrow::Cow, collections::HashMap, sync::Arc};
use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
events::{
AnySyncTimelineEvent, beacon_info::BeaconInfoEventContent,
poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
},
room_version_rules::RoomVersionRules,
serde::Raw,
};
use tracing::{error, info, trace, warn};
use super::{ObservableItemsTransaction, rfind_event_by_item_id};
use crate::timeline::{
BeaconInfo, EventTimelineItem, LiveLocationState, MsgLikeContent, MsgLikeKind, PollState,
ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
event_item::beacon_info_matches,
};
#[derive(Clone)]
pub(in crate::timeline) enum PendingEditKind {
RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
}
impl std::fmt::Debug for PendingEditKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
}
}
}
#[derive(Clone, Debug)]
pub(in crate::timeline) struct PendingEdit {
pub kind: PendingEditKind,
pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
pub encryption_info: Option<Arc<EncryptionInfo>>,
pub bundled_item_owner: Option<OwnedEventId>,
}
#[derive(Clone, Debug)]
pub(crate) enum AggregationKind {
PollResponse {
sender: OwnedUserId,
timestamp: MilliSecondsSinceUnixEpoch,
answers: Vec<String>,
},
PollEnd {
end_date: MilliSecondsSinceUnixEpoch,
},
Reaction {
key: String,
sender: OwnedUserId,
timestamp: MilliSecondsSinceUnixEpoch,
reaction_status: ReactionStatus,
},
Redaction {
is_local: bool,
},
Edit(PendingEdit),
BeaconUpdate { location: BeaconInfo },
BeaconStop { content: BeaconInfoEventContent },
CallDeclined {
sender: OwnedUserId,
},
}
#[derive(Clone, Debug)]
pub(crate) struct Aggregation {
pub kind: AggregationKind,
pub own_id: TimelineEventItemId,
}
fn poll_state_from_item<'a>(
event: &'a mut Cow<'_, EventTimelineItem>,
) -> Result<&'a mut PollState, AggregationError> {
let content = event.to_mut().content_mut();
if let TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(state), .. }) =
content
{
Ok(state)
} else {
Err(AggregationError::InvalidType {
expected: "a poll".to_owned(),
actual: content.debug_string().to_owned(),
})
}
}
fn live_location_state_from_item<'a>(
event: &'a mut Cow<'_, EventTimelineItem>,
) -> Result<&'a mut LiveLocationState, AggregationError> {
let content = event.to_mut().content_mut();
if let TimelineItemContent::MsgLike(MsgLikeContent {
kind: MsgLikeKind::LiveLocation(state),
..
}) = content
{
Ok(state)
} else {
Err(AggregationError::InvalidType {
expected: "a live location".to_owned(),
actual: content.debug_string().to_owned(),
})
}
}
fn rtc_notification_declinations_from_item<'a>(
event: &'a mut Cow<'_, EventTimelineItem>,
) -> Result<&'a mut Vec<OwnedUserId>, AggregationError> {
let content = event.to_mut().content_mut();
if let TimelineItemContent::RtcNotification { declined_by, .. } = content {
Ok(declined_by)
} else {
Err(AggregationError::InvalidType {
expected: "an rtc notification".to_owned(),
actual: content.debug_string().to_owned(),
})
}
}
impl Aggregation {
pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
Self { kind, own_id }
}
fn apply(
&self,
event: &mut Cow<'_, EventTimelineItem>,
rules: &RoomVersionRules,
) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, answers } => {
match poll_state_from_item(event) {
Ok(state) => {
state.add_response(sender.clone(), *timestamp, answers.clone());
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
}
}
AggregationKind::Redaction { is_local } => {
let is_local_redacted =
event.content().is_redacted() && event.unredacted_item.is_some();
let is_remote_redacted =
event.content().is_redacted() && event.unredacted_item.is_none();
if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
ApplyAggregationResult::LeftItemIntact
} else {
let new_item = event.redact(&rules.redaction, *is_local);
*event = Cow::Owned(new_item);
ApplyAggregationResult::UpdatedItem
}
}
AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
Ok(state) => {
if !state.end(*end_date) {
return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
}
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
},
AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
let Some(reactions) = event.content().reactions() else {
return ApplyAggregationResult::LeftItemIntact;
};
let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
let is_same = previous_reaction.is_some_and(|prev| {
prev.timestamp == *timestamp
&& matches!(
(&prev.status, reaction_status),
(ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
| (
ReactionStatus::LocalToRemote(_),
ReactionStatus::LocalToRemote(_),
)
| (
ReactionStatus::RemoteToRemote(_),
ReactionStatus::RemoteToRemote(_),
)
)
});
if is_same {
ApplyAggregationResult::LeftItemIntact
} else {
let reactions = event
.to_mut()
.content_mut()
.reactions_mut()
.expect("reactions was Some above");
reactions.entry(key.clone()).or_default().insert(
sender.clone(),
ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
);
ApplyAggregationResult::UpdatedItem
}
}
AggregationKind::Edit(_) => {
ApplyAggregationResult::Edit
}
AggregationKind::BeaconUpdate { location } => {
match live_location_state_from_item(event) {
Ok(state) => {
state.add_location(location.clone());
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
}
}
AggregationKind::BeaconStop { content } => match live_location_state_from_item(event) {
Ok(state) => {
state.stop(content.clone());
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
},
AggregationKind::CallDeclined { sender } => {
match rtc_notification_declinations_from_item(event) {
Ok(declinations) => {
if declinations.contains(sender) {
ApplyAggregationResult::LeftItemIntact
} else {
declinations.push(sender.clone());
ApplyAggregationResult::UpdatedItem
}
}
Err(err) => ApplyAggregationResult::Error(err),
}
}
}
}
fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, .. } => {
let state = match poll_state_from_item(event) {
Ok(state) => state,
Err(err) => return ApplyAggregationResult::Error(err),
};
state.remove_response(sender, *timestamp);
ApplyAggregationResult::UpdatedItem
}
AggregationKind::PollEnd { .. } => {
ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
}
AggregationKind::Redaction { is_local } => {
if *is_local {
if event.unredacted_item.is_some() {
*event = Cow::Owned(event.unredact());
ApplyAggregationResult::UpdatedItem
} else {
ApplyAggregationResult::LeftItemIntact
}
} else {
ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
}
}
AggregationKind::Reaction { key, sender, .. } => {
let Some(reactions) = event.content().reactions() else {
return ApplyAggregationResult::LeftItemIntact;
};
let had_entry =
reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
if had_entry {
let reactions = event
.to_mut()
.content_mut()
.reactions_mut()
.expect("reactions was some above");
let by_user = reactions.get_mut(key);
if let Some(by_user) = by_user {
by_user.swap_remove(sender);
if by_user.is_empty() {
reactions.swap_remove(key);
}
}
ApplyAggregationResult::UpdatedItem
} else {
ApplyAggregationResult::LeftItemIntact
}
}
AggregationKind::Edit(_) => {
ApplyAggregationResult::Edit
}
AggregationKind::BeaconUpdate { location } => {
match live_location_state_from_item(event) {
Ok(state) => {
state.remove_location(location.ts);
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
}
}
AggregationKind::BeaconStop { .. } => {
ApplyAggregationResult::Error(AggregationError::CantUndoBeaconStop)
}
AggregationKind::CallDeclined { .. } => {
ApplyAggregationResult::Error(AggregationError::CantUndoRtcDecline)
}
}
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct Aggregations {
related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
pending_beacon_stops: HashMap<OwnedUserId, Aggregation>,
}
impl Aggregations {
pub fn clear(&mut self) {
self.related_events.clear();
self.inverted_map.clear();
self.pending_beacon_stops.clear();
}
pub fn add_pending_beacon_stop(&mut self, sender: OwnedUserId, aggregation: Aggregation) {
self.pending_beacon_stops.insert(sender, aggregation);
}
fn promote_pending_beacon_stop(
&mut self,
sender: &OwnedUserId,
target_event_id: OwnedEventId,
start_content: &BeaconInfoEventContent,
) {
if !start_content.live {
return;
}
let Some(stop) = self.pending_beacon_stops.remove(sender) else { return };
let AggregationKind::BeaconStop { content: stop_content } = &stop.kind else {
warn!("pending beacon stop has unexpected aggregation kind");
return;
};
if !beacon_info_matches(start_content, stop_content) {
trace!("discarding stale pending beacon stop (content mismatch)");
return;
}
let target = TimelineEventItemId::EventId(target_event_id);
self.add(target, stop);
}
pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
for agg in self.related_events.remove(&related_to).unwrap_or_default() {
self.inverted_map.remove(&agg.own_id);
}
}
if let Some(previous_aggregations) = self.related_events.get(&related_to)
&& previous_aggregations
.iter()
.any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
{
return;
}
self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
let related_events = self.related_events.entry(related_to).or_default();
if let Some(pos) = related_events.iter().position(|agg| agg.own_id == aggregation.own_id) {
related_events.remove(pos);
}
related_events.push(aggregation);
}
pub fn try_remove_aggregation(
&mut self,
aggregation_id: &TimelineEventItemId,
items: &mut ObservableItemsTransaction<'_>,
) -> Result<bool, AggregationError> {
let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
let removed = aggregations
.iter()
.position(|agg| agg.own_id == *aggregation_id)
.map(|idx| aggregations.remove(idx));
if aggregations.is_empty() {
self.related_events.remove(found);
}
removed
} else {
None
};
let Some(aggregation) = aggregation else {
warn!(
"incorrect internal state: {aggregation_id:?} was present in the inverted map, \
not in related-to map."
);
return Ok(false);
};
if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
let mut cowed = Cow::Borrowed(&*item);
match aggregation.unapply(&mut cowed) {
ApplyAggregationResult::UpdatedItem => {
trace!("removed aggregation");
items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
);
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {
warn!("error when unapplying aggregation: {err}");
}
ApplyAggregationResult::Edit => {
if let Some(aggregations) = self.related_events.get(found) {
if resolve_edits(aggregations, items, &mut cowed) {
items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
);
} else {
}
} else {
}
}
}
} else {
info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
}
Ok(true)
}
pub fn apply_all(
&mut self,
item_id: &TimelineEventItemId,
sender: &OwnedUserId,
event: &mut Cow<'_, EventTimelineItem>,
items: &mut ObservableItemsTransaction<'_>,
rules: &RoomVersionRules,
) -> Result<(), AggregationError> {
if let TimelineEventItemId::EventId(event_id) = item_id
&& let Some(live_location) = event.content().as_live_location_state()
{
self.promote_pending_beacon_stop(sender, event_id.clone(), &live_location.beacon_info);
}
let Some(aggregations) = self.related_events.get(item_id) else {
return Ok(());
};
let mut has_edits = false;
for a in aggregations {
match a.apply(event, rules) {
ApplyAggregationResult::Edit => {
has_edits = true;
}
ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => return Err(err),
}
}
if has_edits {
resolve_edits(aggregations, items, event);
}
Ok(())
}
pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
let from = TimelineEventItemId::TransactionId(txn_id);
let to = TimelineEventItemId::EventId(event_id);
if let Some(aggregations) = self.related_events.remove(&from) {
for a in &aggregations {
if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
debug_assert_eq!(prev_target, from);
self.inverted_map.insert(a.own_id.clone(), to.clone());
}
}
self.related_events.entry(to).or_default().extend(aggregations);
}
}
pub fn mark_aggregation_as_sent(
&mut self,
txn_id: OwnedTransactionId,
event_id: OwnedEventId,
items: &mut ObservableItemsTransaction<'_>,
rules: &RoomVersionRules,
) -> bool {
let from = TimelineEventItemId::TransactionId(txn_id);
let to = TimelineEventItemId::EventId(event_id.clone());
let Some(target) = self.inverted_map.remove(&from) else {
return false;
};
if let Some(aggregations) = self.related_events.get_mut(&target)
&& let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
{
found.own_id = to.clone();
match &mut found.kind {
AggregationKind::PollResponse { .. }
| AggregationKind::PollEnd { .. }
| AggregationKind::Edit(..)
| AggregationKind::BeaconUpdate { .. }
| AggregationKind::BeaconStop { .. }
| AggregationKind::CallDeclined { .. } => {
}
AggregationKind::Redaction { is_local } => {
*is_local = false;
let found = found.clone();
find_item_and_apply_aggregation(self, items, &target, found, rules);
}
AggregationKind::Reaction { reaction_status, .. } => {
*reaction_status = ReactionStatus::RemoteToRemote(event_id);
let found = found.clone();
find_item_and_apply_aggregation(self, items, &target, found, rules);
}
}
}
self.inverted_map.insert(to, target);
true
}
pub fn is_aggregation_of(&self, item: &TimelineEventItemId) -> Option<&TimelineEventItemId> {
self.inverted_map.get(item)
}
}
fn resolve_edits(
aggregations: &[Aggregation],
items: &ObservableItemsTransaction<'_>,
event: &mut Cow<'_, EventTimelineItem>,
) -> bool {
let mut best_edit: Option<(PendingEdit, bool)> = None;
let mut best_edit_pos = None;
for a in aggregations {
if let AggregationKind::Edit(pending_edit) = &a.kind {
match &a.own_id {
TimelineEventItemId::TransactionId(_) => {
best_edit = Some((pending_edit.clone(), true));
break;
}
TimelineEventItemId::EventId(event_id) => {
if let Some(best_edit_pos) = &mut best_edit_pos {
let pos = items.position_by_event_id(
pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
);
if let Some(pos) = pos {
if pos > *best_edit_pos {
best_edit = Some((pending_edit.clone(), false));
*best_edit_pos = pos;
trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
}
} else {
trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
if best_edit.is_none() {
best_edit = Some((pending_edit.clone(), false));
trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
}
}
} else {
best_edit = Some((pending_edit.clone(), false));
best_edit_pos = items.position_by_event_id(event_id);
trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
}
}
}
}
}
if let Some((edit, is_local_echo)) = best_edit {
edit_item(event, edit, is_local_echo)
} else {
false
}
}
fn edit_item(
item: &mut Cow<'_, EventTimelineItem>,
edit: PendingEdit,
is_local_echo: bool,
) -> bool {
if !is_local_echo {
let Some(original_json) = item.original_json() else {
error!("The original event does not have the JSON field set.");
return false;
};
let Some(edit_json) = &edit.edit_json else {
error!(
"The replacement event of a remotely received edit does not have the JSON field set."
);
return false;
};
match check_validity_of_replacement_events(
original_json,
item.encryption_info(),
edit_json,
edit.encryption_info.as_deref(),
) {
Ok(content) => content,
Err(e) => {
warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
return false;
}
}
}
let TimelineItemContent::MsgLike(content) = item.content() else {
info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
return false;
};
let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
match (edit_kind, content) {
(
PendingEditKind::RoomMessage(replacement),
MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
) => {
let mut new_msg = msg.clone();
new_msg.apply_edit(replacement.new_content);
let new_item = item.with_content_and_latest_edit(
TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
edit_json,
);
*item = Cow::Owned(new_item);
}
(
PendingEditKind::Poll(replacement),
MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
) => {
if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
let new_item = item.with_content_and_latest_edit(
TimelineItemContent::MsgLike(
content.with_kind(MsgLikeKind::Poll(new_poll_state)),
),
edit_json,
);
*item = Cow::Owned(new_item);
} else {
return false;
}
}
(edit_kind, _) => {
info!(
content = item.content().debug_string(),
edit = format!("{:?}", edit_kind),
"Mismatch between edit type and content type",
);
return false;
}
}
if let Some(encryption_info) = encryption_info {
*item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
}
true
}
pub(crate) fn find_item_and_apply_aggregation(
aggregations: &Aggregations,
items: &mut ObservableItemsTransaction<'_>,
target: &TimelineEventItemId,
aggregation: Aggregation,
rules: &RoomVersionRules,
) -> Option<EventTimelineItem> {
let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
trace!("couldn't find aggregation's target {target:?}");
return None;
};
let mut cowed = Cow::Borrowed(&*event_item);
match aggregation.apply(&mut cowed, rules) {
ApplyAggregationResult::UpdatedItem => {
trace!("applied aggregation");
let new_event_item = cowed.into_owned();
let new_item =
TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
items.replace(idx, new_item);
Some(new_event_item)
}
ApplyAggregationResult::Edit => {
if let Some(aggregations) = aggregations.related_events.get(target)
&& resolve_edits(aggregations, items, &mut cowed)
{
let new_event_item = cowed.into_owned();
let new_item =
TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
items.replace(idx, new_item);
return Some(new_event_item);
}
None
}
ApplyAggregationResult::LeftItemIntact => {
trace!("applying the aggregation had no effect");
None
}
ApplyAggregationResult::Error(err) => {
warn!("error when applying aggregation: {err}");
None
}
}
}
enum ApplyAggregationResult {
UpdatedItem,
Edit,
LeftItemIntact,
Error(AggregationError),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum AggregationError {
#[error("trying to end a poll twice")]
PollAlreadyEnded,
#[error("a poll end can't be unapplied")]
CantUndoPollEnd,
#[error("a redaction can't be unapplied")]
CantUndoRedaction,
#[error("a beacon stop can't be unapplied")]
CantUndoBeaconStop,
#[error("a call decline can't be unapplied")]
CantUndoRtcDecline,
#[error(
"trying to apply an aggregation of one type to an invalid target: \
expected {expected}, actual {actual}"
)]
InvalidType { expected: String, actual: String },
}