use std::{
iter::once,
ops::{Deref, Not},
};
use eyeball::{AsyncLock, SharedObservable, Subscriber};
pub use matrix_sdk_base::latest_event::{
LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
};
use matrix_sdk_base::{
RoomInfoNotableUpdateReasons, StateChanges, deserialized_responses::TimelineEvent,
store::SerializableEventContent,
};
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
events::{
AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
relation::Replacement,
room::{
member::MembershipState,
message::{MessageType, Relation},
power_levels::RoomPowerLevels,
},
},
};
use tracing::{error, instrument, warn};
use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
#[derive(Debug)]
pub(super) struct LatestEvent {
weak_room: WeakRoom,
_thread_id: Option<OwnedEventId>,
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
current_value: SharedObservable<LatestEventValue, AsyncLock>,
}
impl LatestEvent {
pub(super) async fn new(
weak_room: &WeakRoom,
thread_id: Option<&EventId>,
room_event_cache: &RoomEventCache,
) -> Self {
Self {
weak_room: weak_room.clone(),
_thread_id: thread_id.map(ToOwned::to_owned),
buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
current_value: SharedObservable::new_async(
LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
),
}
}
pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
self.current_value.subscribe().await
}
pub async fn update_with_event_cache(
&mut self,
room_event_cache: &RoomEventCache,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) {
if self.buffer_of_values_for_local_events.is_empty().not() {
return;
}
let new_value = LatestEventValueBuilder::new_remote_with_power_levels(
room_event_cache,
own_user_id,
power_levels,
)
.await;
self.update(new_value).await;
}
pub async fn update_with_send_queue(
&mut self,
send_queue_update: &RoomSendQueueUpdate,
room_event_cache: &RoomEventCache,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) {
let new_value = LatestEventValueBuilder::new_local(
send_queue_update,
&mut self.buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await;
self.update(new_value).await;
}
async fn update(&mut self, new_value: LatestEventValue) {
if let LatestEventValue::None = new_value {
} else {
self.current_value.set(new_value.clone()).await;
self.store(new_value).await;
}
}
#[instrument(skip_all)]
async fn store(&mut self, new_value: LatestEventValue) {
let Some(room) = self.weak_room.get() else {
warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
return;
};
let mut room_info = room.clone_info();
room_info.set_new_latest_event(new_value);
let mut state_changes = StateChanges::default();
state_changes.add_room(room_info.clone());
let client = room.client();
let _state_store_lock = client.base_client().state_store_lock().lock().await;
if let Err(error) = client.state_store().save_changes(&state_changes).await {
error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
}
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
}
}
#[cfg(all(not(target_family = "wasm"), test))]
mod tests_latest_event {
use assert_matches::assert_matches;
use matrix_sdk_base::{
RoomInfoNotableUpdateReasons, RoomState,
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
store::StoreConfig,
};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
room_id, user_id,
};
use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
use crate::{
client::WeakClient,
room::WeakRoom,
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
test_utils::mocks::MatrixMockServer,
};
fn local_room_message(body: &str) -> LocalLatestEventValue {
LocalLatestEventValue {
timestamp: MilliSecondsSinceUnixEpoch::now(),
content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
RoomMessageEventContent::text_plain(body),
))
.unwrap(),
}
}
fn new_local_echo_content(
room_send_queue: &RoomSendQueue,
transaction_id: &OwnedTransactionId,
body: &str,
) -> LocalEchoContent {
LocalEchoContent::Event {
serialized_event: SerializableEventContent::new(
&AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
)
.unwrap(),
send_handle: SendHandle::new(
room_send_queue.clone(),
transaction_id.clone(),
MilliSecondsSinceUnixEpoch::now(),
),
send_error: None,
}
}
#[async_test]
async fn test_update_ignores_none_value() {
let room_id = room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
latest_event.update(LatestEventValue::None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
#[async_test]
async fn test_local_has_priority_over_remote() {
let room_id = room_id!("!r0").to_owned();
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let room = client.get_room(&room_id).unwrap();
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
},
],
)
.await
.unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
let send_queue = client.send_queue();
let room_send_queue = send_queue.for_room(room);
let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
{
latest_event.update_with_event_cache(&room_event_cache, None, None).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
}
let transaction_id = OwnedTransactionId::from("txnid0");
{
let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
{
latest_event.update_with_event_cache(&room_event_cache, None, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
{
let update = RoomSendQueueUpdate::SentEvent {
transaction_id,
event_id: event_id!("$ev1").to_owned(),
};
latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
{
latest_event.update_with_event_cache(&room_event_cache, None, None).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
}
}
#[async_test]
async fn test_store_latest_event_value() {
let room_id = room_id!("!r0").to_owned();
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let server = MatrixMockServer::new().await;
let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
{
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config.clone()))
.build()
.await;
let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
],
},
],
)
.await
.unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
{
let latest_event = room.new_latest_event();
assert_matches!(latest_event, LatestEventValue::None);
}
{
let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
latest_event.update_with_event_cache(&room_event_cache, None, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::Remote(_)
);
}
{
let update = room_info_notable_update_receiver.recv().await.unwrap();
assert_eq!(update.room_id, room_id);
assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
}
{
let latest_event = room.new_latest_event();
assert_matches!(latest_event, LatestEventValue::Remote(_));
}
}
{
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config))
.build()
.await;
let room = client.get_room(&room_id).unwrap();
let latest_event = room.new_latest_event();
assert_matches!(latest_event, LatestEventValue::Remote(_));
}
}
}
struct LatestEventValueBuilder;
impl LatestEventValueBuilder {
async fn new_remote(
room_event_cache: &RoomEventCache,
weak_room: &WeakRoom,
) -> LatestEventValue {
let room = weak_room.get();
let (own_user_id, power_levels) = match &room {
Some(room) => {
let power_levels = room.power_levels().await.ok();
(Some(room.own_user_id()), power_levels)
}
None => (None, None),
};
Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref())
.await
}
async fn new_remote_with_power_levels(
room_event_cache: &RoomEventCache,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) -> LatestEventValue {
if let Ok(Some(event)) = room_event_cache
.rfind_map_event_in_memory_by(|event, previous_event_id| {
filter_timeline_event(event, previous_event_id, own_user_id, power_levels)
.then(|| event.clone())
})
.await
{
LatestEventValue::Remote(event)
} else {
LatestEventValue::default()
}
}
async fn new_local(
send_queue_update: &RoomSendQueueUpdate,
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
room_event_cache: &RoomEventCache,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) -> LatestEventValue {
use crate::send_queue::{LocalEcho, LocalEchoContent};
match send_queue_update {
RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id,
content: local_echo_content,
}) => match local_echo_content {
LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
match serialized_event_content.deserialize() {
Ok(content) => {
if filter_any_message_like_event_content(content, None) {
let local_value = LocalLatestEventValue {
timestamp: MilliSecondsSinceUnixEpoch::now(),
content: serialized_event_content.clone(),
};
let value = if let Some(LatestEventValue::LocalCannotBeSent(_)) =
buffer_of_values_for_local_events.last()
{
LatestEventValue::LocalCannotBeSent(local_value)
} else {
LatestEventValue::LocalIsSending(local_value)
};
buffer_of_values_for_local_events
.push(transaction_id.to_owned(), value.clone());
value
} else {
LatestEventValue::None
}
}
Err(error) => {
error!(
?error,
"Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
);
LatestEventValue::None
}
}
}
LocalEchoContent::React { .. } => LatestEventValue::None,
},
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
buffer_of_values_for_local_events.remove(position);
}
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await
}
RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
let position =
buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
let value = Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await;
if let Some(position) = position {
buffer_of_values_for_local_events.remove(position);
}
value
}
RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id,
new_content: new_serialized_event_content,
} => {
if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
match new_serialized_event_content.deserialize() {
Ok(content) => {
if filter_any_message_like_event_content(content, None) {
buffer_of_values_for_local_events.replace_content(
position,
new_serialized_event_content.clone(),
);
} else {
buffer_of_values_for_local_events.remove(position);
}
}
Err(error) => {
error!(
?error,
"Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
);
return LatestEventValue::None;
}
}
}
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await
}
RoomSendQueueUpdate::SendError { transaction_id, .. } => {
buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await
}
RoomSendQueueUpdate::RetryEvent { transaction_id } => {
buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
Self::new_local_or_remote(
buffer_of_values_for_local_events,
room_event_cache,
own_user_id,
power_levels,
)
.await
}
RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
}
}
async fn new_local_or_remote(
buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
room_event_cache: &RoomEventCache,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) -> LatestEventValue {
if let Some(value) = buffer_of_values_for_local_events.last() {
value.clone()
} else {
Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await
}
}
}
#[derive(Debug)]
struct LatestEventValuesForLocalEvents {
buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
}
impl LatestEventValuesForLocalEvents {
fn new() -> Self {
Self { buffer: Vec::with_capacity(2) }
}
fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
fn last(&self) -> Option<&LatestEventValue> {
self.buffer.last().map(|(_, value)| value)
}
fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
self.buffer
.iter()
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
}
fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
assert!(
matches!(
value,
LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
),
"`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
);
self.buffer.push((transaction_id, value));
}
fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
match value {
LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
*content = new_content;
}
LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
*content = new_content;
}
_ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
}
}
fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
self.buffer.remove(position)
}
fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
let mut values = self.buffer.iter_mut();
if let Some(first_value_to_wedge) = values
.by_ref()
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
*value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
}
}
}
}
fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
let mut values = self.buffer.iter_mut();
if let Some(first_value_to_unwedge) = values
.by_ref()
.find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
*value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
}
}
}
}
fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
let mut values = self.buffer.iter_mut();
if let Some(position) = values
.by_ref()
.position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
{
for (_, value_to_unwedge) in values {
if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
*value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
}
}
Some(position)
} else {
None
}
}
}
fn filter_timeline_event(
event: &TimelineEvent,
previous_event_id: Option<OwnedEventId>,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) -> bool {
let event = match event.raw().deserialize() {
Ok(event) => event,
Err(error) => {
error!(
?error,
"Failed to deserialize the event when looking for a suitable latest event"
);
return false;
}
};
match event {
AnySyncTimelineEvent::MessageLike(message_like_event) => {
match message_like_event.original_content() {
Some(any_message_like_event_content) => filter_any_message_like_event_content(
any_message_like_event_content,
previous_event_id,
),
None => true,
}
}
AnySyncTimelineEvent::State(state) => {
filter_any_sync_state_event(state, own_user_id, power_levels)
}
}
}
fn filter_any_message_like_event_content(
event: AnyMessageLikeEventContent,
previous_event_id: Option<OwnedEventId>,
) -> bool {
match event {
AnyMessageLikeEventContent::RoomMessage(message) => {
if let MessageType::VerificationRequest(_) = message.msgtype {
return false;
}
match &message.relates_to {
Some(Relation::Replacement(Replacement { event_id, .. })) => {
Some(event_id) == previous_event_id.as_ref()
}
_ => true,
}
}
AnyMessageLikeEventContent::UnstablePollStart(_)
| AnyMessageLikeEventContent::CallInvite(_)
| AnyMessageLikeEventContent::RtcNotification(_)
| AnyMessageLikeEventContent::Sticker(_) => true,
AnyMessageLikeEventContent::RoomEncrypted(_) => false,
_ => false,
}
}
fn filter_any_sync_state_event(
event: AnySyncStateEvent,
own_user_id: Option<&UserId>,
power_levels: Option<&RoomPowerLevels>,
) -> bool {
match event {
AnySyncStateEvent::RoomMember(member) => {
match member.membership() {
MembershipState::Knock => {
let can_accept_or_decline_knocks = match (own_user_id, power_levels) {
(Some(own_user_id), Some(room_power_levels)) => {
room_power_levels.user_can_invite(own_user_id)
|| room_power_levels
.user_can_kick_user(own_user_id, member.state_key())
}
_ => false,
};
if can_accept_or_decline_knocks {
return matches!(member, SyncStateEvent::Original(_));
}
false
}
MembershipState::Invite => {
match member {
SyncStateEvent::Original(state) => {
Some(state.state_key.deref()) == own_user_id
}
_ => false,
}
}
_ => false,
}
}
_ => false,
}
}
#[cfg(test)]
mod tests_latest_event_content {
use std::ops::Not;
use matrix_sdk_test::event_factory::EventFactory;
use ruma::{
event_id,
events::{room::message::RoomMessageEventContent, rtc::notification::NotificationType},
owned_user_id, user_id,
};
use super::filter_timeline_event;
macro_rules! assert_latest_event_content {
( event | $event_factory:ident | $event_builder:block
is a candidate ) => {
assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
};
( event | $event_factory:ident | $event_builder:block
is not a candidate ) => {
assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
};
( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id);
let event = {
let $event_factory = event_factory;
$event_builder
};
assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect );
};
}
#[test]
fn test_room_message() {
assert_latest_event_content!(
event | event_factory | { event_factory.text_msg("hello").into_event() }
is a candidate
);
}
#[test]
fn test_redacted() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.redacted(
user_id!("@mnt_io:matrix.org"),
ruma::events::room::message::RedactedRoomMessageEventContent::new(),
)
.into_event()
}
is a candidate
);
}
#[test]
fn test_room_message_replacement() {
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id);
let event = event_factory
.text_msg("bonjour")
.edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
.into_event();
{
let previous_event_id = None;
assert!(
filter_timeline_event(
&event,
previous_event_id,
Some(user_id!("@mnt_io:matrix.org")),
None
)
.not()
);
}
{
let previous_event_id = Some(event_id!("$ev1").to_owned());
assert!(
filter_timeline_event(
&event,
previous_event_id,
Some(user_id!("@mnt_io:matrix.org")),
None
)
.not()
);
}
{
let previous_event_id = Some(event_id!("$ev0").to_owned());
assert!(filter_timeline_event(
&event,
previous_event_id,
Some(user_id!("@mnt_io:matrix.org")),
None
));
}
}
#[test]
fn test_poll() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
.into_event()
}
is a candidate
);
}
#[test]
fn test_call_invite() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.call_invite(
ruma::OwnedVoipId::from("vvooiipp".to_owned()),
ruma::UInt::from(1234u32),
ruma::events::call::SessionDescription::new(
"type".to_owned(),
"sdp".to_owned(),
),
ruma::VoipVersionId::V1,
)
.into_event()
}
is a candidate
);
}
#[test]
fn test_rtc_notification() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.rtc_notification(
NotificationType::Ring,
)
.mentions(vec![owned_user_id!("@alice:server.name")])
.relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
.lifetime(60)
.into_event()
}
is a candidate
);
}
#[test]
fn test_sticker() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.sticker(
"wink wink",
ruma::events::room::ImageInfo::new(),
ruma::OwnedMxcUri::from("mxc://foo/bar"),
)
.into_event()
}
is a candidate
);
}
#[test]
fn test_encrypted_room_message() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
ciphertext: "cipher".to_owned(),
sender_key: "sender_key".to_owned(),
device_id: "device_id".into(),
session_id: "session_id".to_owned(),
}
.into(),
),
None,
))
.into_event()
}
is not a candidate
);
}
#[test]
fn test_reaction() {
assert_latest_event_content!(
event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
is not a candidate
);
}
#[test]
fn test_state_event() {
assert_latest_event_content!(
event | event_factory | { event_factory.room_topic("new room topic").into_event() }
is not a candidate
);
}
#[test]
fn test_knocked_state_event_without_power_levels() {
assert_latest_event_content!(
event | event_factory | {
event_factory
.member(user_id!("@other_mnt_io:server.name"))
.membership(ruma::events::room::member::MembershipState::Knock)
.into_event()
}
is not a candidate
);
}
#[test]
fn test_knocked_state_event_with_power_levels() {
use ruma::{
events::room::{
member::MembershipState,
power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
},
room_version_rules::AuthorizationRules,
};
let user_id = user_id!("@mnt_io:matrix.org");
let other_user_id = user_id!("@other_mnt_io:server.name");
let event_factory = EventFactory::new().sender(user_id);
let event =
event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
let mut room_power_levels =
RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
room_power_levels.users.insert(user_id.to_owned(), 5.into());
room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
{
room_power_levels.invite = 10.into();
room_power_levels.kick = 10.into();
assert!(
filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
"cannot accept, cannot decline",
);
}
{
room_power_levels.invite = 0.into();
room_power_levels.kick = 10.into();
assert!(
filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
"can accept, cannot decline",
);
}
{
room_power_levels.invite = 10.into();
room_power_levels.kick = 0.into();
assert!(
filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
"cannot accept, can decline",
);
}
{
room_power_levels.invite = 0.into();
room_power_levels.kick = 0.into();
assert!(
filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
"can accept, can decline",
);
}
{
room_power_levels.users.insert(user_id.to_owned(), 5.into());
room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
room_power_levels.invite = 10.into();
room_power_levels.kick = 0.into();
assert!(
filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
"cannot accept, can decline, at least same user levels",
);
}
}
#[test]
fn test_invite_state_event() {
use ruma::events::room::member::MembershipState;
assert_latest_event_content!(
event | event_factory | {
event_factory
.member(user_id!("@mnt_io:matrix.org"))
.membership(MembershipState::Invite)
.into_event()
}
is a candidate
);
}
#[test]
fn test_invite_state_event_for_someone_else() {
use ruma::events::room::member::MembershipState;
assert_latest_event_content!(
event | event_factory | {
event_factory
.member(user_id!("@other_mnt_io:server.name"))
.membership(MembershipState::Invite)
.into_event()
}
is not a candidate
);
}
#[test]
fn test_room_message_verification_request() {
use ruma::{OwnedDeviceId, events::room::message};
assert_latest_event_content!(
event | event_factory | {
event_factory
.event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
message::KeyVerificationRequestEventContent::new(
"body".to_owned(),
vec![],
OwnedDeviceId::from("device_id"),
user_id!("@user:server.name").to_owned(),
),
)))
.into_event()
}
is not a candidate
);
}
}
#[cfg(test)]
mod tests_latest_event_values_for_local_events {
use assert_matches::assert_matches;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedTransactionId,
events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
serde::Raw,
};
use serde_json::json;
use super::{
LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
RemoteLatestEventValue, SerializableEventContent,
};
fn remote_room_message(body: &str) -> RemoteLatestEventValue {
RemoteLatestEventValue::from_plaintext(
Raw::from_json_string(
json!({
"content": RoomMessageEventContent::text_plain(body),
"type": "m.room.message",
"event_id": "$ev0",
"origin_server_ts": 42,
"sender": "@mnt_io:matrix.org",
})
.to_string(),
)
.unwrap(),
)
}
fn local_room_message(body: &str) -> LocalLatestEventValue {
LocalLatestEventValue {
timestamp: MilliSecondsSinceUnixEpoch::now(),
content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
RoomMessageEventContent::text_plain(body),
))
.unwrap(),
}
}
#[test]
fn test_last() {
let mut buffer = LatestEventValuesForLocalEvents::new();
assert!(buffer.last().is_none());
buffer.push(
OwnedTransactionId::from("txnid"),
LatestEventValue::LocalIsSending(local_room_message("tome")),
);
assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
}
#[test]
fn test_position() {
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id = OwnedTransactionId::from("txnid");
assert!(buffer.position(&transaction_id).is_none());
buffer.push(
transaction_id.clone(),
LatestEventValue::LocalIsSending(local_room_message("raclette")),
);
buffer.push(
OwnedTransactionId::from("othertxnid"),
LatestEventValue::LocalIsSending(local_room_message("tome")),
);
assert_eq!(buffer.position(&transaction_id), Some(0));
}
#[test]
#[should_panic]
fn test_push_none() {
let mut buffer = LatestEventValuesForLocalEvents::new();
buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
}
#[test]
#[should_panic]
fn test_push_remote() {
let mut buffer = LatestEventValuesForLocalEvents::new();
buffer.push(
OwnedTransactionId::from("txnid"),
LatestEventValue::Remote(remote_room_message("tome")),
);
}
#[test]
fn test_push_local() {
let mut buffer = LatestEventValuesForLocalEvents::new();
buffer.push(
OwnedTransactionId::from("txnid0"),
LatestEventValue::LocalIsSending(local_room_message("tome")),
);
buffer.push(
OwnedTransactionId::from("txnid1"),
LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
);
}
#[test]
fn test_replace_content() {
let mut buffer = LatestEventValuesForLocalEvents::new();
buffer.push(
OwnedTransactionId::from("txnid0"),
LatestEventValue::LocalIsSending(local_room_message("gruyère")),
);
let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
buffer.replace_content(0, new_content);
assert_matches!(
buffer.last(),
Some(LatestEventValue::LocalIsSending(local_event)) => {
assert_matches!(
local_event.content.deserialize().unwrap(),
AnyMessageLikeEventContent::RoomMessage(content) => {
assert_eq!(content.body(), "comté");
}
);
}
);
}
#[test]
fn test_remove() {
let mut buffer = LatestEventValuesForLocalEvents::new();
buffer.push(
OwnedTransactionId::from("txnid"),
LatestEventValue::LocalIsSending(local_room_message("gryuère")),
);
assert!(buffer.last().is_some());
buffer.remove(0);
assert!(buffer.last().is_none());
}
#[test]
fn test_mark_cannot_be_sent_from() {
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
let transaction_id_2 = OwnedTransactionId::from("txnid2");
buffer.push(
transaction_id_0,
LatestEventValue::LocalIsSending(local_room_message("gruyère")),
);
buffer.push(
transaction_id_1.clone(),
LatestEventValue::LocalIsSending(local_room_message("brigand")),
);
buffer.push(
transaction_id_2,
LatestEventValue::LocalIsSending(local_room_message("raclette")),
);
buffer.mark_cannot_be_sent_from(&transaction_id_1);
assert_eq!(buffer.buffer.len(), 3);
assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
}
#[test]
fn test_mark_is_sending_from() {
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
let transaction_id_2 = OwnedTransactionId::from("txnid2");
buffer.push(
transaction_id_0,
LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
);
buffer.push(
transaction_id_1.clone(),
LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
);
buffer.push(
transaction_id_2,
LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
);
buffer.mark_is_sending_from(&transaction_id_1);
assert_eq!(buffer.buffer.len(), 3);
assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
}
#[test]
fn test_mark_is_sending_after() {
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
let transaction_id_2 = OwnedTransactionId::from("txnid2");
buffer.push(
transaction_id_0,
LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
);
buffer.push(
transaction_id_1.clone(),
LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
);
buffer.push(
transaction_id_2,
LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
);
buffer.mark_is_sending_after(&transaction_id_1);
assert_eq!(buffer.buffer.len(), 3);
assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
}
}
#[cfg(all(not(target_family = "wasm"), test))]
mod tests_latest_event_value_builder {
use std::sync::Arc;
use assert_matches::assert_matches;
use matrix_sdk_base::{
RoomState,
deserialized_responses::TimelineEventKind,
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
store::SerializableEventContent,
};
use matrix_sdk_test::{async_test, event_factory::EventFactory};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
events::{
AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
room::message::RoomMessageEventContent,
},
room_id, user_id,
};
use super::{
LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
};
use crate::{
Client, Error,
client::WeakClient,
room::WeakRoom,
send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
test_utils::mocks::MatrixMockServer,
};
macro_rules! assert_remote_value_matches_room_message_with_body {
( $latest_event_value:expr => with body = $body:expr ) => {
assert_matches!(
$latest_event_value,
LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
assert_matches!(
event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(message_content)
)
) => {
assert_eq!(message_content.content.body(), $body);
}
);
}
);
};
}
macro_rules! assert_local_value_matches_room_message_with_body {
( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
assert_matches!(
$latest_event_value,
$pattern (local_event) => {
assert_matches!(
local_event.content.deserialize().unwrap(),
AnyMessageLikeEventContent::RoomMessage(message_content) => {
assert_eq!(message_content.body(), $body);
}
);
}
);
};
}
#[async_test]
async fn test_remote_is_scanning_event_backwards_from_event_cache() {
let room_id = room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(room_id);
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
let event_id_2 = event_id!("$ev2");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
{
client.base_client().get_or_create_room(room_id, RoomState::Joined);
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("hello").event_id(event_id_0).into(),
event_factory.text_msg("world").event_id(event_id_1).into(),
event_factory
.room_topic("new room topic")
.event_id(event_id_2)
.into(),
],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
assert_remote_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
);
}
async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
let room_id = room_id!("!r0").to_owned();
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let room = client.get_room(&room_id).unwrap();
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
let send_queue = client.send_queue();
let room_send_queue = send_queue.for_room(room);
(client, room_id, room_send_queue, room_event_cache)
}
fn new_local_echo_content(
room_send_queue: &RoomSendQueue,
transaction_id: &OwnedTransactionId,
body: &str,
) -> LocalEchoContent {
LocalEchoContent::Event {
serialized_event: SerializableEventContent::new(
&AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
)
.unwrap(),
send_handle: SendHandle::new(
room_send_queue.clone(),
transaction_id.clone(),
MilliSecondsSinceUnixEpoch::now(),
),
send_error: None,
}
}
#[async_test]
async fn test_local_new_local_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
{
let transaction_id = OwnedTransactionId::from("txnid0");
let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "A"
);
}
{
let transaction_id = OwnedTransactionId::from("txnid1");
let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
}
assert_eq!(buffer.buffer.len(), 2);
}
#[async_test]
async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = {
let transaction_id = OwnedTransactionId::from("txnid0");
let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "A"
);
transaction_id
};
{
let update = RoomSendQueueUpdate::SendError {
transaction_id: transaction_id_0.clone(),
error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
is_recoverable: true,
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalCannotBeSent => with body = "A"
);
assert_eq!(buffer.buffer.len(), 1);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
}
{
let transaction_id = OwnedTransactionId::from("txnid1");
let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalCannotBeSent => with body = "B"
);
}
assert_eq!(buffer.buffer.len(), 2);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
}
#[async_test]
async fn test_local_cancelled_local_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
let transaction_id_2 = OwnedTransactionId::from("txnid2");
{
for (transaction_id, body) in
[(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
{
let content = new_local_echo_content(&room_send_queue, transaction_id, body);
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = body
);
}
assert_eq!(buffer.buffer.len(), 3);
}
{
let update = RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: transaction_id_1.clone(),
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "C"
);
assert_eq!(buffer.buffer.len(), 2);
}
{
let update = RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: transaction_id_2.clone(),
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "A"
);
assert_eq!(buffer.buffer.len(), 1);
}
{
let update =
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
assert_matches!(
LatestEventValueBuilder::new_local(
&update,
&mut buffer,
&room_event_cache,
None,
None
)
.await,
LatestEventValue::None
);
assert!(buffer.buffer.is_empty());
}
}
#[async_test]
async fn test_local_sent_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
{
for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
let content = new_local_echo_content(&room_send_queue, transaction_id, body);
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = body
);
}
assert_eq!(buffer.buffer.len(), 2);
}
{
let update = RoomSendQueueUpdate::SentEvent {
transaction_id: transaction_id_0.clone(),
event_id: event_id!("$ev0").to_owned(),
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
assert_eq!(buffer.buffer.len(), 1);
}
{
let update = RoomSendQueueUpdate::SentEvent {
transaction_id: transaction_id_1,
event_id: event_id!("$ev1").to_owned(),
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
assert!(buffer.buffer.is_empty());
}
}
#[async_test]
async fn test_local_replaced_local_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
{
for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
let content = new_local_echo_content(&room_send_queue, transaction_id, body);
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = body
);
}
assert_eq!(buffer.buffer.len(), 2);
}
{
let transaction_id = &transaction_id_0;
let LocalEchoContent::Event { serialized_event: new_content, .. } =
new_local_echo_content(&room_send_queue, transaction_id, "A.")
else {
panic!("oopsy");
};
let update = RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: transaction_id.clone(),
new_content,
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
assert_eq!(buffer.buffer.len(), 2);
}
{
let transaction_id = &transaction_id_1;
let LocalEchoContent::Event { serialized_event: new_content, .. } =
new_local_echo_content(&room_send_queue, transaction_id, "B.")
else {
panic!("oopsy");
};
let update = RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: transaction_id.clone(),
new_content,
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B."
);
assert_eq!(buffer.buffer.len(), 2);
}
}
#[async_test]
async fn test_local_replaced_local_event_by_a_non_suitable_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id = OwnedTransactionId::from("txnid0");
{
let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "A"
);
assert_eq!(buffer.buffer.len(), 1);
}
{
let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
ReactionEventContent::new(Annotation::new(
event_id!("$ev0").to_owned(),
"+1".to_owned(),
)),
))
.unwrap();
let update = RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: transaction_id.clone(),
new_content,
};
assert_matches!(
LatestEventValueBuilder::new_local(
&update,
&mut buffer,
&room_event_cache,
None,
None
)
.await,
LatestEventValue::None
);
assert_eq!(buffer.buffer.len(), 0);
}
}
#[async_test]
async fn test_local_send_error() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
{
for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
let content = new_local_echo_content(&room_send_queue, transaction_id, body);
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = body
);
}
assert_eq!(buffer.buffer.len(), 2);
}
{
let update = RoomSendQueueUpdate::SendError {
transaction_id: transaction_id_0.clone(),
error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
is_recoverable: true,
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalCannotBeSent => with body = "B"
);
assert_eq!(buffer.buffer.len(), 2);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
}
{
let update = RoomSendQueueUpdate::SentEvent {
transaction_id: transaction_id_0.clone(),
event_id: event_id!("$ev0").to_owned(),
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
assert_eq!(buffer.buffer.len(), 1);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
}
}
#[async_test]
async fn test_local_retry_event() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id_0 = OwnedTransactionId::from("txnid0");
let transaction_id_1 = OwnedTransactionId::from("txnid1");
{
for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
let content = new_local_echo_content(&room_send_queue, transaction_id, body);
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = body
);
}
assert_eq!(buffer.buffer.len(), 2);
}
{
let update = RoomSendQueueUpdate::SendError {
transaction_id: transaction_id_0.clone(),
error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
is_recoverable: true,
};
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalCannotBeSent => with body = "B"
);
assert_eq!(buffer.buffer.len(), 2);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
}
{
let update =
RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "B"
);
assert_eq!(buffer.buffer.len(), 2);
assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
}
}
#[async_test]
async fn test_local_media_upload() {
let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
let mut buffer = LatestEventValuesForLocalEvents::new();
let transaction_id = OwnedTransactionId::from("txnid");
{
let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
assert_local_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
LatestEventValue::LocalIsSending => with body = "A"
);
assert_eq!(buffer.buffer.len(), 1);
}
{
let update = RoomSendQueueUpdate::MediaUpload {
related_to: transaction_id,
file: None,
index: 0,
progress: AbstractProgress { current: 0, total: 0 },
};
assert_matches!(
LatestEventValueBuilder::new_local(
&update,
&mut buffer,
&room_event_cache,
None,
None
)
.await,
LatestEventValue::None
);
assert_eq!(buffer.buffer.len(), 1);
}
}
#[async_test]
async fn test_local_fallbacks_to_remote_when_empty() {
let room_id = room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(room_id);
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
{
client.base_client().get_or_create_room(room_id, RoomState::Joined);
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("hello").event_id(event_id_0).into(),
],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
let mut buffer = LatestEventValuesForLocalEvents::new();
assert_remote_value_matches_room_message_with_body!(
LatestEventValueBuilder::new_local(
&RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid"),
event_id: event_id_1.to_owned(),
},
&mut buffer,
&room_event_cache,
None,
None,
)
.await
=> with body = "hello"
);
}
}