use std::{borrow::Cow, collections::HashMap, sync::Arc};
use as_variant::as_variant;
use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
events::{
AnySyncTimelineEvent,
poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
},
room_version_rules::RoomVersionRules,
serde::Raw,
};
use tracing::{error, info, trace, warn};
use super::{ObservableItemsTransaction, rfind_event_by_item_id};
use crate::timeline::{
EventTimelineItem, MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus,
TimelineEventItemId, TimelineItem, TimelineItemContent,
};
#[derive(Clone)]
pub(in crate::timeline) enum PendingEditKind {
RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
}
impl std::fmt::Debug for PendingEditKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
}
}
}
#[derive(Clone, Debug)]
pub(in crate::timeline) struct PendingEdit {
pub kind: PendingEditKind,
pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
pub encryption_info: Option<Arc<EncryptionInfo>>,
pub bundled_item_owner: Option<OwnedEventId>,
}
#[derive(Clone, Debug)]
pub(crate) enum AggregationKind {
PollResponse {
sender: OwnedUserId,
timestamp: MilliSecondsSinceUnixEpoch,
answers: Vec<String>,
},
PollEnd {
end_date: MilliSecondsSinceUnixEpoch,
},
Reaction {
key: String,
sender: OwnedUserId,
timestamp: MilliSecondsSinceUnixEpoch,
reaction_status: ReactionStatus,
},
Redaction,
Edit(PendingEdit),
}
#[derive(Clone, Debug)]
pub(crate) struct Aggregation {
pub kind: AggregationKind,
pub own_id: TimelineEventItemId,
}
fn poll_state_from_item<'a>(
event: &'a mut Cow<'_, EventTimelineItem>,
) -> Result<&'a mut PollState, AggregationError> {
if event.content().is_poll() {
let state = as_variant!(
event.to_mut().content_mut(),
TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
)
.expect("it was a poll just above");
Ok(state)
} else {
Err(AggregationError::InvalidType {
expected: "a poll".to_owned(),
actual: event.content().debug_string().to_owned(),
})
}
}
impl Aggregation {
pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
Self { kind, own_id }
}
fn apply(
&self,
event: &mut Cow<'_, EventTimelineItem>,
rules: &RoomVersionRules,
) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, answers } => {
match poll_state_from_item(event) {
Ok(state) => {
state.add_response(sender.clone(), *timestamp, answers.clone());
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
}
}
AggregationKind::Redaction => {
if event.content().is_redacted() {
ApplyAggregationResult::LeftItemIntact
} else {
let new_item = event.redact(&rules.redaction);
*event = Cow::Owned(new_item);
ApplyAggregationResult::UpdatedItem
}
}
AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
Ok(state) => {
if !state.end(*end_date) {
return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
}
ApplyAggregationResult::UpdatedItem
}
Err(err) => ApplyAggregationResult::Error(err),
},
AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
let Some(reactions) = event.content().reactions() else {
return ApplyAggregationResult::LeftItemIntact;
};
let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
let is_same = previous_reaction.is_some_and(|prev| {
prev.timestamp == *timestamp
&& matches!(
(&prev.status, reaction_status),
(ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
| (
ReactionStatus::LocalToRemote(_),
ReactionStatus::LocalToRemote(_),
)
| (
ReactionStatus::RemoteToRemote(_),
ReactionStatus::RemoteToRemote(_),
)
)
});
if is_same {
ApplyAggregationResult::LeftItemIntact
} else {
let reactions = event
.to_mut()
.content_mut()
.reactions_mut()
.expect("reactions was Some above");
reactions.entry(key.clone()).or_default().insert(
sender.clone(),
ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
);
ApplyAggregationResult::UpdatedItem
}
}
AggregationKind::Edit(_) => {
ApplyAggregationResult::Edit
}
}
}
fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, .. } => {
let state = match poll_state_from_item(event) {
Ok(state) => state,
Err(err) => return ApplyAggregationResult::Error(err),
};
state.remove_response(sender, *timestamp);
ApplyAggregationResult::UpdatedItem
}
AggregationKind::PollEnd { .. } => {
ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
}
AggregationKind::Redaction => {
ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
}
AggregationKind::Reaction { key, sender, .. } => {
let Some(reactions) = event.content().reactions() else {
return ApplyAggregationResult::LeftItemIntact;
};
let had_entry =
reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
if had_entry {
let reactions = event
.to_mut()
.content_mut()
.reactions_mut()
.expect("reactions was some above");
let by_user = reactions.get_mut(key);
if let Some(by_user) = by_user {
by_user.swap_remove(sender);
if by_user.is_empty() {
reactions.swap_remove(key);
}
}
ApplyAggregationResult::UpdatedItem
} else {
ApplyAggregationResult::LeftItemIntact
}
}
AggregationKind::Edit(_) => {
ApplyAggregationResult::Edit
}
}
}
}
#[derive(Clone, Debug, Default)]
pub(crate) struct Aggregations {
related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
}
impl Aggregations {
pub fn clear(&mut self) {
self.related_events.clear();
self.inverted_map.clear();
}
pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
if matches!(aggregation.kind, AggregationKind::Redaction) {
for agg in self.related_events.remove(&related_to).unwrap_or_default() {
self.inverted_map.remove(&agg.own_id);
}
}
if let Some(previous_aggregations) = self.related_events.get(&related_to)
&& previous_aggregations
.iter()
.any(|agg| matches!(agg.kind, AggregationKind::Redaction))
{
return;
}
self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
self.related_events.entry(related_to).or_default().push(aggregation);
}
pub fn try_remove_aggregation(
&mut self,
aggregation_id: &TimelineEventItemId,
items: &mut ObservableItemsTransaction<'_>,
) -> Result<bool, AggregationError> {
let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
let removed = aggregations
.iter()
.position(|agg| agg.own_id == *aggregation_id)
.map(|idx| aggregations.remove(idx));
if aggregations.is_empty() {
self.related_events.remove(found);
}
removed
} else {
None
};
let Some(aggregation) = aggregation else {
warn!(
"incorrect internal state: {aggregation_id:?} was present in the inverted map, \
not in related-to map."
);
return Ok(false);
};
if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
let mut cowed = Cow::Borrowed(&*item);
match aggregation.unapply(&mut cowed) {
ApplyAggregationResult::UpdatedItem => {
trace!("removed aggregation");
items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
);
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {
warn!("error when unapplying aggregation: {err}");
}
ApplyAggregationResult::Edit => {
if let Some(aggregations) = self.related_events.get(found) {
if resolve_edits(aggregations, items, &mut cowed) {
items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
);
} else {
}
} else {
}
}
}
} else {
info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
}
Ok(true)
}
pub fn apply_all(
&self,
item_id: &TimelineEventItemId,
event: &mut Cow<'_, EventTimelineItem>,
items: &mut ObservableItemsTransaction<'_>,
rules: &RoomVersionRules,
) -> Result<(), AggregationError> {
let Some(aggregations) = self.related_events.get(item_id) else {
return Ok(());
};
let mut has_edits = false;
for a in aggregations {
match a.apply(event, rules) {
ApplyAggregationResult::Edit => {
has_edits = true;
}
ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => return Err(err),
}
}
if has_edits {
resolve_edits(aggregations, items, event);
}
Ok(())
}
pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
let from = TimelineEventItemId::TransactionId(txn_id);
let to = TimelineEventItemId::EventId(event_id);
if let Some(aggregations) = self.related_events.remove(&from) {
for a in &aggregations {
if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
debug_assert_eq!(prev_target, from);
self.inverted_map.insert(a.own_id.clone(), to.clone());
}
}
self.related_events.entry(to).or_default().extend(aggregations);
}
}
pub fn mark_aggregation_as_sent(
&mut self,
txn_id: OwnedTransactionId,
event_id: OwnedEventId,
items: &mut ObservableItemsTransaction<'_>,
rules: &RoomVersionRules,
) -> bool {
let from = TimelineEventItemId::TransactionId(txn_id);
let to = TimelineEventItemId::EventId(event_id.clone());
let Some(target) = self.inverted_map.remove(&from) else {
return false;
};
if let Some(aggregations) = self.related_events.get_mut(&target)
&& let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
{
found.own_id = to.clone();
match &mut found.kind {
AggregationKind::PollResponse { .. }
| AggregationKind::PollEnd { .. }
| AggregationKind::Edit(..)
| AggregationKind::Redaction => {
}
AggregationKind::Reaction { reaction_status, .. } => {
*reaction_status = ReactionStatus::RemoteToRemote(event_id);
let found = found.clone();
find_item_and_apply_aggregation(self, items, &target, found, rules);
}
}
}
self.inverted_map.insert(to, target);
true
}
}
fn resolve_edits(
aggregations: &[Aggregation],
items: &ObservableItemsTransaction<'_>,
event: &mut Cow<'_, EventTimelineItem>,
) -> bool {
let mut best_edit: Option<(PendingEdit, bool)> = None;
let mut best_edit_pos = None;
for a in aggregations {
if let AggregationKind::Edit(pending_edit) = &a.kind {
match &a.own_id {
TimelineEventItemId::TransactionId(_) => {
best_edit = Some((pending_edit.clone(), true));
break;
}
TimelineEventItemId::EventId(event_id) => {
if let Some(best_edit_pos) = &mut best_edit_pos {
let pos = items.position_by_event_id(
pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
);
if let Some(pos) = pos {
if pos > *best_edit_pos {
best_edit = Some((pending_edit.clone(), false));
*best_edit_pos = pos;
trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
}
} else {
trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
if best_edit.is_none() {
best_edit = Some((pending_edit.clone(), false));
trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
}
}
} else {
best_edit = Some((pending_edit.clone(), false));
best_edit_pos = items.position_by_event_id(event_id);
trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
}
}
}
}
}
if let Some((edit, is_local_echo)) = best_edit {
edit_item(event, edit, is_local_echo)
} else {
false
}
}
fn edit_item(
item: &mut Cow<'_, EventTimelineItem>,
edit: PendingEdit,
is_local_echo: bool,
) -> bool {
if !is_local_echo {
let Some(original_json) = item.original_json() else {
error!("The original event does not have the JSON field set.");
return false;
};
let Some(edit_json) = &edit.edit_json else {
error!(
"The replacement event of a remotely received edit does not have the JSON field set."
);
return false;
};
match check_validity_of_replacement_events(
original_json,
item.encryption_info(),
edit_json,
edit.encryption_info.as_deref(),
) {
Ok(content) => content,
Err(e) => {
warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
return false;
}
}
}
let TimelineItemContent::MsgLike(content) = item.content() else {
info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
return false;
};
let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
match (edit_kind, content) {
(
PendingEditKind::RoomMessage(replacement),
MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
) => {
let mut new_msg = msg.clone();
new_msg.apply_edit(replacement.new_content);
let new_item = item.with_content_and_latest_edit(
TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
edit_json,
);
*item = Cow::Owned(new_item);
}
(
PendingEditKind::Poll(replacement),
MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
) => {
if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
let new_item = item.with_content_and_latest_edit(
TimelineItemContent::MsgLike(
content.with_kind(MsgLikeKind::Poll(new_poll_state)),
),
edit_json,
);
*item = Cow::Owned(new_item);
} else {
return false;
}
}
(edit_kind, _) => {
info!(
content = item.content().debug_string(),
edit = format!("{:?}", edit_kind),
"Mismatch between edit type and content type",
);
return false;
}
}
if let Some(encryption_info) = encryption_info {
*item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
}
true
}
pub(crate) fn find_item_and_apply_aggregation(
aggregations: &Aggregations,
items: &mut ObservableItemsTransaction<'_>,
target: &TimelineEventItemId,
aggregation: Aggregation,
rules: &RoomVersionRules,
) -> Option<EventTimelineItem> {
let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
trace!("couldn't find aggregation's target {target:?}");
return None;
};
let mut cowed = Cow::Borrowed(&*event_item);
match aggregation.apply(&mut cowed, rules) {
ApplyAggregationResult::UpdatedItem => {
trace!("applied aggregation");
let new_event_item = cowed.into_owned();
let new_item =
TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
items.replace(idx, new_item);
Some(new_event_item)
}
ApplyAggregationResult::Edit => {
if let Some(aggregations) = aggregations.related_events.get(target)
&& resolve_edits(aggregations, items, &mut cowed)
{
let new_event_item = cowed.into_owned();
let new_item =
TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
items.replace(idx, new_item);
return Some(new_event_item);
}
None
}
ApplyAggregationResult::LeftItemIntact => {
trace!("applying the aggregation had no effect");
None
}
ApplyAggregationResult::Error(err) => {
warn!("error when applying aggregation: {err}");
None
}
}
}
enum ApplyAggregationResult {
UpdatedItem,
Edit,
LeftItemIntact,
Error(AggregationError),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum AggregationError {
#[error("trying to end a poll twice")]
PollAlreadyEnded,
#[error("a poll end can't be unapplied")]
CantUndoPollEnd,
#[error("a redaction can't be unapplied")]
CantUndoRedaction,
#[error(
"trying to apply an aggregation of one type to an invalid target: \
expected {expected}, actual {actual}"
)]
InvalidType { expected: String, actual: String },
}