mod error;
mod latest_event;
mod room_latest_events;
use std::{
collections::HashMap,
ops::{ControlFlow, DerefMut, Not},
sync::Arc,
};
pub use error::LatestEventsError;
use eyeball::{AsyncLock, Subscriber};
use latest_event::{LatestEvent, With};
pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, timer};
use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
use room_latest_events::{RoomLatestEvents, RoomLatestEventsWriteGuard};
use ruma::{EventId, OwnedRoomId, RoomId};
use tokio::{
select,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
};
use tracing::{info, warn};
use crate::{
client::WeakClient,
event_cache::{EventCache, RoomEventCacheGenericUpdate},
room::WeakRoom,
send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
};
#[derive(Clone, Debug)]
pub struct LatestEvents {
state: Arc<LatestEventsState>,
}
#[derive(Debug)]
struct LatestEventsState {
registered_rooms: Arc<RegisteredRooms>,
_listen_task_handle: AbortOnDrop<()>,
_computation_task_handle: AbortOnDrop<()>,
}
impl LatestEvents {
pub(crate) fn new(
weak_client: WeakClient,
event_cache: EventCache,
send_queue: SendQueue,
room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
) -> Self {
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
let registered_rooms =
Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
let listen_task_handle = spawn(listen_to_updates_task(
registered_rooms.clone(),
event_cache,
send_queue,
room_info_updates,
latest_event_queue_sender,
))
.abort_on_drop();
let computation_task_handle = spawn(compute_latest_events_task(
registered_rooms.clone(),
latest_event_queue_receiver,
))
.abort_on_drop();
Self {
state: Arc::new(LatestEventsState {
registered_rooms,
_listen_task_handle: listen_task_handle,
_computation_task_handle: computation_task_handle,
}),
}
}
pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
}
#[cfg(test)]
pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
self.state.registered_rooms.rooms.read().await.contains_key(room_id)
}
pub async fn listen_and_subscribe_to_room(
&self,
room_id: &RoomId,
) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
return Ok(None);
};
let room_latest_events = room_latest_events.read().await;
let latest_event = room_latest_events.for_room();
Ok(Some(latest_event.subscribe().await))
}
pub async fn listen_to_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<bool, LatestEventsError> {
Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
}
pub async fn listen_and_subscribe_to_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
let Some(room_latest_events) =
self.state.registered_rooms.for_thread(room_id, thread_id).await?
else {
return Ok(None);
};
let room_latest_events = room_latest_events.read().await;
let latest_event = room_latest_events
.for_thread(thread_id)
.expect("The `LatestEvent` for the thread must have been created");
Ok(Some(latest_event.subscribe().await))
}
pub async fn forget_room(&self, room_id: &RoomId) {
self.state.registered_rooms.forget_room(room_id).await;
}
pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
self.state.registered_rooms.forget_thread(room_id, thread_id).await;
}
}
#[derive(Debug)]
struct RegisteredRooms {
rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
weak_client: WeakClient,
event_cache: EventCache,
latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
}
impl RegisteredRooms {
fn new(
weak_client: WeakClient,
event_cache: &EventCache,
latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
) -> Self {
Self {
rooms: RwLock::new(HashMap::default()),
weak_client,
event_cache: event_cache.clone(),
latest_event_queue_sender: latest_event_queue_sender.clone(),
}
}
async fn room_latest_event(
&self,
room_id: &RoomId,
thread_id: Option<&EventId>,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
fn create_and_insert_room_latest_events(
room_id: &RoomId,
rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
weak_client: &WeakClient,
event_cache: &EventCache,
latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
) {
let (room_latest_events, is_latest_event_value_none) =
With::unzip(RoomLatestEvents::new(
WeakRoom::new(weak_client.clone(), room_id.to_owned()),
event_cache,
));
rooms.insert(room_id.to_owned(), room_latest_events);
if is_latest_event_value_none {
let _ = latest_event_queue_sender
.send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
}
}
Ok(match thread_id {
Some(thread_id) => {
let mut rooms = self.rooms.write().await;
if rooms.contains_key(room_id).not() {
create_and_insert_room_latest_events(
room_id,
rooms.deref_mut(),
&self.weak_client,
&self.event_cache,
&self.latest_event_queue_sender,
);
}
if let Some(room_latest_event) = rooms.get(room_id) {
let mut room_latest_event = room_latest_event.write().await;
if room_latest_event.has_thread(thread_id).not() {
room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
}
}
RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
}
None => {
match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
.ok()
{
value @ Some(_) => value,
None => {
let _timer = timer!(
tracing::Level::INFO,
format!("Creating `RoomLatestEvents` for {room_id:?}"),
);
let mut rooms = self.rooms.write().await;
if rooms.contains_key(room_id).not() {
create_and_insert_room_latest_events(
room_id,
rooms.deref_mut(),
&self.weak_client,
&self.event_cache,
&self.latest_event_queue_sender,
);
}
RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
}
}
}
})
}
pub async fn for_room(
&self,
room_id: &RoomId,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
self.room_latest_event(room_id, None).await
}
pub async fn for_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
self.room_latest_event(room_id, Some(thread_id)).await
}
pub async fn forget_room(&self, room_id: &RoomId) {
{
let mut rooms = self.rooms.write().await;
rooms.remove(room_id);
}
}
pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
let rooms = self.rooms.read().await;
if let Some(room_latest_event) = rooms.get(room_id) {
let mut room_latest_event = room_latest_event.write().await;
drop(rooms);
room_latest_event.forget_thread(thread_id);
}
}
}
#[derive(Debug)]
enum LatestEventQueueUpdate {
EventCache {
room_id: OwnedRoomId,
},
SendQueue {
room_id: OwnedRoomId,
update: RoomSendQueueUpdate,
},
RoomInfo {
room_id: OwnedRoomId,
reasons: RoomInfoNotableUpdateReasons,
},
}
async fn listen_to_updates_task(
registered_rooms: Arc<RegisteredRooms>,
event_cache: EventCache,
send_queue: SendQueue,
room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
) {
let mut event_cache_generic_updates_subscriber =
event_cache.subscribe_to_room_generic_updates();
let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
let mut room_info_updates_subscriber = room_info_updates.resubscribe();
loop {
if listen_to_updates(
®istered_rooms.rooms,
&mut event_cache_generic_updates_subscriber,
&mut send_queue_generic_updates_subscriber,
&mut room_info_updates_subscriber,
&latest_event_queue_sender,
)
.await
.is_break()
{
warn!("`listen_to_updates_task` has stopped");
break;
}
}
}
async fn listen_to_updates(
registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
room_info_updates_subscriber: &mut broadcast::Receiver<RoomInfoNotableUpdate>,
latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
) -> ControlFlow<()> {
select! {
room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
if registered_rooms.read().await.contains_key(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
room_id
});
}
} else {
warn!("`event_cache_generic_updates` channel has been closed");
return ControlFlow::Break(());
}
}
send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
if registered_rooms.read().await.contains_key(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
room_id,
update
});
}
} else {
warn!("`send_queue_generic_updates` channel has been closed");
return ControlFlow::Break(());
}
}
room_info_update = room_info_updates_subscriber.recv() => {
if let Ok(RoomInfoNotableUpdate { room_id, reasons }) = room_info_update {
if
reasons == RoomInfoNotableUpdateReasons::LATEST_EVENT ||
!reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP)
{
return ControlFlow::Continue(())
}
if registered_rooms.read().await.contains_key(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::RoomInfo {
room_id,
reasons,
});
}
} else {
warn!("`room_info_updates` channel has been closed");
return ControlFlow::Break(());
}
}
}
ControlFlow::Continue(())
}
async fn compute_latest_events_task(
registered_rooms: Arc<RegisteredRooms>,
mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
) {
const BUFFER_SIZE: usize = 16;
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
compute_latest_events(®istered_rooms, &buffer).await;
buffer.clear();
}
warn!("`compute_latest_events_task` has stopped");
}
async fn compute_latest_events(
registered_rooms: &RegisteredRooms,
latest_event_queue_updates: &[LatestEventQueueUpdate],
) {
async fn room_latest_events_write_guard(
registered_rooms: &RegisteredRooms,
room_id: &OwnedRoomId,
) -> ControlFlow<RoomLatestEventsWriteGuard, ()> {
let rooms = registered_rooms.rooms.read().await;
if let Some(room_latest_events) = rooms.get(room_id) {
let room_latest_events = room_latest_events.write().await;
drop(rooms);
ControlFlow::Break(room_latest_events)
} else {
info!(?room_id, "Failed to find the room");
ControlFlow::Continue(())
}
}
for latest_event_queue_update in latest_event_queue_updates {
match latest_event_queue_update {
LatestEventQueueUpdate::EventCache { room_id } => {
let ControlFlow::Break(mut room_latest_events) =
room_latest_events_write_guard(registered_rooms, room_id).await
else {
continue;
};
room_latest_events.update_with_event_cache().await;
}
LatestEventQueueUpdate::SendQueue { room_id, update } => {
let ControlFlow::Break(mut room_latest_events) =
room_latest_events_write_guard(registered_rooms, room_id).await
else {
continue;
};
room_latest_events.update_with_send_queue(update).await;
}
LatestEventQueueUpdate::RoomInfo { room_id, reasons } => {
let ControlFlow::Break(mut room_latest_events) =
room_latest_events_write_guard(registered_rooms, room_id).await
else {
continue;
};
room_latest_events.update_with_room_info(*reasons).await;
}
}
}
}
#[cfg(test)]
fn local_room_message(body: &str) -> LocalLatestEventValue {
use matrix_sdk_base::store::SerializableEventContent;
use ruma::{
MilliSecondsSinceUnixEpoch,
events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
};
LocalLatestEventValue {
timestamp: MilliSecondsSinceUnixEpoch::now(),
content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
RoomMessageEventContent::text_plain(body),
))
.unwrap(),
}
}
#[cfg(all(test, not(target_family = "wasm")))]
mod tests {
use std::{collections::HashMap, ops::Not, time::Duration};
use assert_matches::assert_matches;
use matrix_sdk_base::{
RoomState,
deserialized_responses::TimelineEventKind,
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
};
use matrix_sdk_test::{
InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
events::{
AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
room::member::{MembershipState, SyncRoomMemberEvent},
},
owned_event_id, owned_room_id, room_id, user_id,
};
use stream_assert::assert_pending;
use tokio::{task::yield_now, time::timeout};
use super::{
LatestEventValue, RegisteredRooms, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomLatestEvents, RoomSendQueueUpdate,
RwLock, SendQueueUpdate, WeakClient, WeakRoom, With, broadcast, listen_to_updates, mpsc,
};
use crate::{
latest_events::{LatestEventQueueUpdate, local_room_message},
test_utils::mocks::MatrixMockServer,
};
#[async_test]
async fn test_latest_events_are_lazy() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let room_id_2 = room_id!("!r2");
let thread_id_1_0 = event_id!("$ev1.0");
let thread_id_2_0 = event_id!("$ev2.0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 2);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.contains_key(room_id_1));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
}
assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 3);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.contains_key(room_id_1));
assert!(rooms.contains_key(room_id_2));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
let room_1 = rooms.get(room_id_1).unwrap().read().await;
assert_eq!(room_1.per_thread().len(), 1);
assert!(room_1.per_thread().contains_key(thread_id_1_0));
let room_2 = rooms.get(room_id_2).unwrap().read().await;
assert_eq!(room_2.per_thread().len(), 1);
assert!(room_2.per_thread().contains_key(thread_id_2_0));
}
}
#[async_test]
async fn test_forget_room() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
}
latest_events.forget_room(room_id_0).await;
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert!(rooms.is_empty());
}
}
#[async_test]
async fn test_forget_thread() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let thread_id_0_0 = event_id!("$ev0.0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
let room_0 = rooms.get(room_id_0).unwrap().read().await;
assert_eq!(room_0.per_thread().len(), 1);
assert!(room_0.per_thread().contains_key(thread_id_0_0));
}
latest_events.forget_thread(room_id_0, thread_id_0_0).await;
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_room_event_cache() {
let room_id = owned_room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
let weak_room = WeakRoom::new(weak_client, room_id.clone());
let event_cache = client.event_cache();
let registered_rooms = RwLock::new(HashMap::new());
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
room_event_cache_generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty());
}
{
registered_rooms.write().await.insert(
room_id.clone(),
With::inner(RoomLatestEvents::new(weak_room, event_cache)),
);
room_event_cache_generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_send_queue() {
let room_id = owned_room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
let weak_room = WeakRoom::new(weak_client, room_id.clone());
let event_cache = client.event_cache();
let registered_rooms = RwLock::new(HashMap::new());
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid0"),
event_id: owned_event_id!("$ev0"),
},
})
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty());
}
{
registered_rooms.write().await.insert(
room_id.clone(),
With::inner(RoomLatestEvents::new(weak_room, event_cache)),
);
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid1"),
event_id: owned_event_id!("$ev1"),
},
})
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_room_info() {
let room_id = owned_room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
let weak_room = WeakRoom::new(weak_client, room_id.clone());
let event_cache = client.event_cache();
let registered_rooms = RwLock::new(HashMap::new());
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
room_info_update_sender
.send(RoomInfoNotableUpdate {
room_id: room_id.clone(),
reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
})
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty());
}
{
registered_rooms.write().await.insert(
room_id.clone(),
With::inner(RoomLatestEvents::new(weak_room, event_cache)),
);
room_info_update_sender
.send(RoomInfoNotableUpdate {
room_id: room_id.clone(),
reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
})
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_specific_room_info_update_reasons() {
let room_id = owned_room_id!("!r0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let weak_client = WeakClient::from_client(&client);
let weak_room = WeakRoom::new(weak_client, room_id.clone());
let event_cache = client.event_cache();
let registered_rooms = RwLock::new(HashMap::new());
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
registered_rooms
.write()
.await
.insert(room_id.clone(), With::inner(RoomLatestEvents::new(weak_room, event_cache)));
for reason in {
let mut all = RoomInfoNotableUpdateReasons::all();
all.remove(RoomInfoNotableUpdateReasons::MEMBERSHIP);
all.iter()
} {
room_info_update_sender
.send(RoomInfoNotableUpdate { room_id: room_id.clone(), reasons: reason })
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty());
}
{
room_info_update_sender
.send(RoomInfoNotableUpdate {
room_id: room_id.clone(),
reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
})
.unwrap();
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
let registered_rooms = RwLock::new(HashMap::new());
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
drop(room_event_cache_generic_update_sender);
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
let registered_rooms = RwLock::new(HashMap::new());
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
drop(send_queue_generic_update_sender);
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_inputs_task_stops_when_room_info_updates_are_closed() {
let registered_rooms = RwLock::new(HashMap::new());
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
drop(room_info_update_sender);
assert!(
listen_to_updates(
®istered_rooms,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut room_info_update_receiver,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_latest_event_value_is_updated_via_event_cache() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let event_id_0 = event_id!("$ev0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let latest_events = client.latest_events().await;
let mut latest_event_stream =
latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
assert_pending!(latest_event_stream);
server
.sync_room(
&client,
JoinedRoomBuilder::new(&room_id)
.add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_0)),
)
.await;
assert_matches!(
latest_event_stream.next().await,
Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
assert_matches!(
event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(message_content)
)
) => {
assert_eq!(message_content.content.body(), "raclette !");
}
);
}
);
assert_pending!(latest_event_stream);
}
#[async_test]
async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let event_id_0 = event_id!("$ev0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
{
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("hello").event_id(event_id_0).into(),
],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let latest_events = client.latest_events().await;
let mut latest_event_stream =
latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
yield_now().await;
assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
assert_pending!(latest_event_stream);
}
#[async_test]
async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
let room_id_0 = owned_room_id!("!r0");
let room_id_1 = owned_room_id!("!r1");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
let mut room_info_1 = room_0.clone_info();
room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
room_1.update_room_info(|_| (room_info_1, Default::default())).await;
let weak_client = WeakClient::from_client(&client);
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let (latest_event_queue_sender, mut latest_event_queue_receiver) =
mpsc::unbounded_channel();
let registered_rooms =
RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
{
let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
assert_matches!(
room_latest_events.read().await.for_room().get().await,
LatestEventValue::None
);
assert_matches!(
latest_event_queue_receiver.recv().await,
Some(LatestEventQueueUpdate::EventCache { room_id }) => {
assert_eq!(room_id, room_id_0);
}
);
assert!(latest_event_queue_receiver.is_empty());
}
{
let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
assert_matches!(
room_latest_events.read().await.for_room().get().await,
LatestEventValue::LocalIsSending(_)
);
assert!(latest_event_queue_receiver.is_empty());
}
}
#[async_test]
async fn test_latest_event_value_is_updated_via_room_infos_for_invites() {
let room_id = owned_room_id!("!r0");
let event_factory = EventFactory::new().room(&room_id);
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
let other_user_id = user_id!("@other:servername");
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let latest_events = client.latest_events().await;
let mut latest_event_stream =
latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
assert_pending!(latest_event_stream);
let now = MilliSecondsSinceUnixEpoch::now().get();
{
server
.sync_room(
&client,
InvitedRoomBuilder::new(&room_id).add_state_event(
event_factory
.member(other_user_id)
.invited(own_user_id)
.event_id(event_id_0),
),
)
.await;
assert_matches!(
latest_event_stream.next().await,
Some(LatestEventValue::RemoteInvite { event_id, timestamp, inviter }) => {
assert!(event_id.is_none());
assert!(timestamp.get() >= now);
assert_eq!(inviter.as_deref(), Some(other_user_id));
}
);
assert_pending!(latest_event_stream);
};
{
server
.sync_room(
&client,
InvitedRoomBuilder::new(&room_id).add_state_event(
event_factory
.member(other_user_id)
.invited(own_user_id)
.event_id(event_id_0),
),
)
.await;
assert!(timeout(Duration::from_secs(1), latest_event_stream.next()).await.is_err());
assert_pending!(latest_event_stream);
}
{
let now = u64::from(now) + 10; server
.sync_room(
&client,
JoinedRoomBuilder::new(&room_id).add_timeline_event(
event_factory
.member(own_user_id)
.membership(MembershipState::Join)
.event_id(event_id_1)
.server_ts(now),
),
)
.await;
assert_matches!(
latest_event_stream.next().await,
Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
assert_matches!(
event.deserialize().unwrap(),
AnySyncTimelineEvent::State(
AnySyncStateEvent::RoomMember(
SyncRoomMemberEvent::Original(event)
)
) => {
assert_eq!(event.event_id, event_id_1);
assert_eq!(event.content.membership, MembershipState::Join);
assert_eq!(event.sender, own_user_id);
assert_eq!(event.state_key, own_user_id);
assert_eq!(u64::from(event.origin_server_ts.get()), now);
}
);
}
);
assert_pending!(latest_event_stream);
}
}
}