mod builder;
use std::ops::{Deref, DerefMut, Not};
use builder::{BufferOfValuesForLocalEvents, Builder};
use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber};
pub use matrix_sdk_base::latest_event::{
LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
};
use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState};
use ruma::{EventId, OwnedEventId, UserId, events::room::power_levels::RoomPowerLevels};
use tracing::{error, info, instrument, trace, warn};
use crate::{Room, event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
#[derive(Debug)]
pub(super) struct LatestEvent {
weak_room: WeakRoom,
_thread_id: Option<OwnedEventId>,
buffer_of_values_for_local_events: BufferOfValuesForLocalEvents,
current_value: SharedObservable<LatestEventValue, AsyncLock>,
}
impl LatestEvent {
pub fn new(
weak_room: &WeakRoom,
thread_id: Option<&EventId>,
) -> With<Self, IsLatestEventValueNone> {
let latest_event_value = match thread_id {
Some(_thread_id) => LatestEventValue::default(),
None => weak_room.get().map(|room| room.latest_event()).unwrap_or_default(),
};
let is_none = latest_event_value.is_none();
With {
result: Self {
weak_room: weak_room.clone(),
_thread_id: thread_id.map(ToOwned::to_owned),
buffer_of_values_for_local_events: BufferOfValuesForLocalEvents::new(),
current_value: SharedObservable::new_async(latest_event_value),
},
with: is_none,
}
}
pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
self.current_value.subscribe().await
}
#[cfg(test)]
pub async fn get(&self) -> LatestEventValue {
self.current_value.get().await
}
pub async fn update_with_event_cache(
&mut self,
room_event_cache: &RoomEventCache,
own_user_id: &UserId,
power_levels: Option<&RoomPowerLevels>,
) {
if self.buffer_of_values_for_local_events.is_empty().not() {
return;
}
let current_event = self.current_value.get().await;
let new_value =
Builder::new_remote(room_event_cache, current_event, own_user_id, power_levels).await;
trace!(value = ?new_value, "Computed a remote `LatestEventValue`");
if let Some(new_value) = new_value {
self.update(new_value).await;
}
}
pub async fn update_with_send_queue(
&mut self,
send_queue_update: &RoomSendQueueUpdate,
room_event_cache: &RoomEventCache,
own_user_id: &UserId,
power_levels: Option<&RoomPowerLevels>,
) {
let current_event = self.current_value.get().await;
let new_value = Builder::new_local(
send_queue_update,
&mut self.buffer_of_values_for_local_events,
room_event_cache,
current_event,
own_user_id,
power_levels,
)
.await;
trace!(value = ?new_value, "Computed a local `LatestEventValue`");
if let Some(new_value) = new_value {
self.update(new_value).await;
}
}
pub async fn update_with_room_info(
&mut self,
room: Room,
reasons: RoomInfoNotableUpdateReasons,
) {
if reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP) {
let new_value = match room.state() {
RoomState::Invited => {
if matches!(
self.current_value.read().await.deref(),
LatestEventValue::RemoteInvite { .. }
) {
return;
}
let new_value = Builder::new_remote_for_invite(&room).await;
trace!(value = ?new_value, "Computed a remote `LatestEventValue` for invite");
new_value
}
_ => {
info!(
"Skipping the computation of a remote `LatestEventValue` from a `RoomInfo`"
);
return;
}
};
self.update(new_value).await;
}
}
async fn update(&mut self, new_value: LatestEventValue) {
{
let mut guard = self.current_value.write().await;
let previous_value = guard.deref();
let do_update = match (previous_value, &new_value) {
(LatestEventValue::None, LatestEventValue::None) => false,
(_, LatestEventValue::None) | (LatestEventValue::None, _) => true,
(previous, new) if previous.event_id() == new.event_id() => false,
(_, _) => true,
};
if do_update {
ObservableWriteGuard::set(&mut guard, new_value.clone());
drop(guard);
self.store(new_value).await;
}
}
}
#[instrument(skip_all)]
async fn store(&mut self, new_value: LatestEventValue) {
let Some(room) = self.weak_room.get() else {
warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
return;
};
let result = room
.update_and_save_room_info(|mut info| {
info.set_latest_event(new_value);
(info, RoomInfoNotableUpdateReasons::LATEST_EVENT)
})
.await;
if let Err(error) = result {
error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
}
}
}
pub(super) struct With<T, W> {
result: T,
with: W,
}
impl<T, W> With<T, W> {
pub fn map<F, O>(this: With<T, W>, f: F) -> With<O, W>
where
F: FnOnce(T) -> O,
{
With { result: f(this.result), with: this.with }
}
pub fn inner(this: With<T, W>) -> T {
this.result
}
pub fn unzip(this: With<T, W>) -> (T, W) {
(this.result, this.with)
}
}
impl<T, W> Deref for With<T, W> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.result
}
}
impl<T, W> DerefMut for With<T, W> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.result
}
}
pub(super) type IsLatestEventValueNone = bool;
#[cfg(all(not(target_family = "wasm"), test))]
mod tests_latest_event {
use std::ops::Not;
use assert_matches::assert_matches;
use matrix_sdk_base::{
RoomInfoNotableUpdateReasons, RoomState,
latest_event::RemoteLatestEventValue,
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
store::{SerializableEventContent, StoreConfig},
};
use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
owned_event_id, owned_room_id, room_id, user_id,
};
use stream_assert::{assert_next_matches, assert_pending};
use tokio::task::yield_now;
use super::{super::local_room_message, LatestEvent, LatestEventValue, With};
use crate::{
client::WeakClient,
room::WeakRoom,
send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
test_utils::mocks::MatrixMockServer,
};
fn new_local_echo_content(
room_send_queue: &RoomSendQueue,
transaction_id: &OwnedTransactionId,
body: &str,
) -> LocalEchoContent {
LocalEchoContent::Event {
serialized_event: SerializableEventContent::new(
&AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
)
.unwrap(),
send_handle: SendHandle::new(
room_send_queue.clone(),
transaction_id.clone(),
MilliSecondsSinceUnixEpoch::now(),
),
send_error: None,
}
}
#[async_test]
async fn test_new_loads_from_room_info() {
let room_id = room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
{
let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
assert!(is_none);
}
{
room.update_room_info(|mut info| {
info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
(info, Default::default())
})
.await;
}
{
let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
assert!(is_none.not());
}
}
#[async_test]
async fn test_update_do_not_ignore_none_value() {
let room_id = room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let mut latest_event = LatestEvent::new(&weak_room, None);
assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
latest_event.update(LatestEventValue::None).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
}
#[async_test]
async fn test_update_ignore_none_if_previous_value_is_none() {
let room_id = room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
let mut latest_event = LatestEvent::new(&weak_room, None);
let mut stream = latest_event.subscribe().await;
assert_pending!(stream);
latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
latest_event.update(LatestEventValue::None).await;
assert_next_matches!(stream, LatestEventValue::None);
latest_event.update(LatestEventValue::None).await;
assert_pending!(stream);
latest_event.update(LatestEventValue::None).await;
assert_pending!(stream);
latest_event.update(LatestEventValue::LocalIsSending(local_room_message("bar"))).await;
assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
assert_pending!(stream);
}
#[async_test]
async fn test_update_ignore_when_previous_value_has_the_same_event_id() {
let room_id = room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(room_id);
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
client.base_client().get_or_create_room(room_id, RoomState::Joined);
let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
let mut latest_event = LatestEvent::new(&weak_room, None);
let mut stream = latest_event.subscribe().await;
assert_pending!(stream);
latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
let first_event: RemoteLatestEventValue =
event_factory.text_msg("A").event_id(event_id!("$ev0")).into();
latest_event.update(LatestEventValue::Remote(first_event.clone())).await;
assert_next_matches!(stream, LatestEventValue::Remote(_));
latest_event.update(LatestEventValue::Remote(first_event)).await;
assert_pending!(stream);
let second_event = event_factory.text_msg("A").event_id(event_id!("$ev1")).into();
latest_event.update(LatestEventValue::Remote(second_event)).await;
assert_next_matches!(stream, LatestEventValue::Remote(_));
assert_pending!(stream);
}
#[async_test]
async fn test_local_has_priority_over_remote() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let room = client.get_room(&room_id).unwrap();
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
},
],
)
.await
.unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
let send_queue = client.send_queue();
let room_send_queue = send_queue.for_room(room);
let mut latest_event = LatestEvent::new(&weak_room, None);
{
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
}
let transaction_id = OwnedTransactionId::from("txnid0");
{
let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: transaction_id.clone(),
content,
});
latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
{
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalIsSending(_)
);
}
{
let update = RoomSendQueueUpdate::SentEvent {
transaction_id,
event_id: owned_event_id!("$ev1"),
};
latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::LocalHasBeenSent { .. }
);
}
{
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
}
}
#[async_test]
async fn test_redacted_latest_event_is_removed() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let _room = client.get_room(&room_id).unwrap();
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("A").event_id(event_id_0).into(),
event_factory.text_msg("B").event_id(event_id_1).into(),
],
},
],
)
.await
.unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
let mut latest_event = LatestEvent::new(&weak_room, None);
{
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::Remote(remote) => {
assert_eq!(remote.event_id().as_deref(), Some(event_id_1));
}
);
}
{
server
.mock_sync()
.ok_and_run(&client, |builder| {
builder.add_joined_room(
JoinedRoomBuilder::new(&room_id)
.add_timeline_event(event_factory.redaction(event_id_1)),
);
})
.await;
yield_now().await;
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::Remote(remote) => {
assert_eq!(remote.event_id().as_deref(), Some(event_id_0));
}
);
}
}
#[async_test]
async fn test_store_latest_event_value() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let server = MatrixMockServer::new().await;
let store_config =
StoreConfig::new(CrossProcessLockConfig::multi_process("cross-process-lock-holder"));
{
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config.clone()))
.build()
.await;
let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
],
},
],
)
.await
.unwrap();
let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
{
let latest_event = room.latest_event();
assert_matches!(latest_event, LatestEventValue::None);
}
{
let mut latest_event = LatestEvent::new(&weak_room, None);
latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
assert_matches!(
latest_event.current_value.get().await,
LatestEventValue::Remote(_)
);
}
{
let update = room_info_notable_update_receiver.recv().await.unwrap();
assert_eq!(update.room_id, room_id);
assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
}
{
let latest_event = room.latest_event();
assert_matches!(latest_event, LatestEventValue::Remote(_));
}
}
{
let client = server
.client_builder()
.on_builder(|builder| builder.store_config(store_config))
.build()
.await;
let room = client.get_room(&room_id).unwrap();
let latest_event = room.latest_event();
assert_matches!(latest_event, LatestEventValue::Remote(_));
}
}
}