mod error;
mod latest_event;
mod room_latest_events;
use std::{
collections::{HashMap, HashSet},
ops::{ControlFlow, Not},
sync::Arc,
};
pub use error::LatestEventsError;
use eyeball::{AsyncLock, Subscriber};
use futures_util::FutureExt;
use latest_event::LatestEvent;
pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
use room_latest_events::RoomLatestEvents;
use ruma::{EventId, OwnedRoomId, RoomId};
use tokio::{
select,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
};
use tracing::{error, warn};
use crate::{
client::WeakClient,
event_cache::{EventCache, RoomEventCacheGenericUpdate},
room::WeakRoom,
send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
};
#[derive(Clone, Debug)]
pub struct LatestEvents {
state: Arc<LatestEventsState>,
}
#[derive(Debug)]
struct LatestEventsState {
registered_rooms: Arc<RegisteredRooms>,
_listen_task_handle: AbortOnDrop<()>,
_computation_task_handle: AbortOnDrop<()>,
}
impl LatestEvents {
pub(crate) fn new(
weak_client: WeakClient,
event_cache: EventCache,
send_queue: SendQueue,
) -> Self {
let (room_registration_sender, room_registration_receiver) = mpsc::channel(32);
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
let registered_rooms =
Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache));
let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
registered_rooms.clone(),
room_registration_receiver,
event_cache,
send_queue,
latest_event_queue_sender,
))
.abort_on_drop();
let computation_task_handle = spawn(compute_latest_events_task(
registered_rooms.clone(),
latest_event_queue_receiver,
))
.abort_on_drop();
Self {
state: Arc::new(LatestEventsState {
registered_rooms,
_listen_task_handle: listen_task_handle,
_computation_task_handle: computation_task_handle,
}),
}
}
pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
}
#[cfg(test)]
pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
self.state.registered_rooms.rooms.read().await.contains_key(room_id)
}
pub async fn listen_and_subscribe_to_room(
&self,
room_id: &RoomId,
) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
return Ok(None);
};
let room_latest_events = room_latest_events.read().await;
let latest_event = room_latest_events.for_room();
Ok(Some(latest_event.subscribe().await))
}
pub async fn listen_to_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<bool, LatestEventsError> {
Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
}
pub async fn listen_and_subscribe_to_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
let Some(room_latest_events) =
self.state.registered_rooms.for_thread(room_id, thread_id).await?
else {
return Ok(None);
};
let room_latest_events = room_latest_events.read().await;
let latest_event = room_latest_events
.for_thread(thread_id)
.expect("The `LatestEvent` for the thread must have been created");
Ok(Some(latest_event.subscribe().await))
}
pub async fn forget_room(&self, room_id: &RoomId) {
self.state.registered_rooms.forget_room(room_id).await;
}
pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
self.state.registered_rooms.forget_thread(room_id, thread_id).await;
}
}
#[derive(Debug)]
struct RegisteredRooms {
rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
room_registration_sender: mpsc::Sender<RoomRegistration>,
weak_client: WeakClient,
event_cache: EventCache,
}
impl RegisteredRooms {
fn new(
room_registration_sender: mpsc::Sender<RoomRegistration>,
weak_client: WeakClient,
event_cache: &EventCache,
) -> Self {
Self {
rooms: RwLock::new(HashMap::default()),
room_registration_sender,
weak_client,
event_cache: event_cache.clone(),
}
}
async fn room_latest_event(
&self,
room_id: &RoomId,
thread_id: Option<&EventId>,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
Ok(match thread_id {
Some(thread_id) => {
let mut rooms = self.rooms.write().await;
if rooms.contains_key(room_id).not() {
if let Some(room_latest_event) = RoomLatestEvents::new(
WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
&self.event_cache,
)
.await?
{
rooms.insert(room_id.to_owned(), room_latest_event);
let _ = self
.room_registration_sender
.send(RoomRegistration::Add(room_id.to_owned()))
.await;
}
}
if let Some(room_latest_event) = rooms.get(room_id) {
let mut room_latest_event = room_latest_event.write().await;
if room_latest_event.has_thread(thread_id).not() {
room_latest_event
.create_and_insert_latest_event_for_thread(thread_id)
.await;
}
}
RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
}
None => {
match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
.ok()
{
value @ Some(_) => value,
None => {
let mut rooms = self.rooms.write().await;
if rooms.contains_key(room_id).not() {
if let Some(room_latest_event) = RoomLatestEvents::new(
WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
&self.event_cache,
)
.await?
{
rooms.insert(room_id.to_owned(), room_latest_event);
let _ = self
.room_registration_sender
.send(RoomRegistration::Add(room_id.to_owned()))
.await;
}
}
RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
}
}
}
})
}
pub async fn for_room(
&self,
room_id: &RoomId,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
self.room_latest_event(room_id, None).await
}
pub async fn for_thread(
&self,
room_id: &RoomId,
thread_id: &EventId,
) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
self.room_latest_event(room_id, Some(thread_id)).await
}
pub async fn forget_room(&self, room_id: &RoomId) {
{
let mut rooms = self.rooms.write().await;
rooms.remove(room_id);
}
let _ =
self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())).await;
}
pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
let rooms = self.rooms.read().await;
if let Some(room_latest_event) = rooms.get(room_id) {
let mut room_latest_event = room_latest_event.write().await;
drop(rooms);
room_latest_event.forget_thread(thread_id);
}
}
}
#[derive(Debug)]
enum RoomRegistration {
Add(OwnedRoomId),
Remove(OwnedRoomId),
}
enum LatestEventQueueUpdate {
EventCache {
room_id: OwnedRoomId,
},
SendQueue {
room_id: OwnedRoomId,
update: RoomSendQueueUpdate,
},
}
async fn listen_to_event_cache_and_send_queue_updates_task(
registered_rooms: Arc<RegisteredRooms>,
mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
event_cache: EventCache,
send_queue: SendQueue,
latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
) {
let mut event_cache_generic_updates_subscriber =
event_cache.subscribe_to_room_generic_updates();
let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
let mut listened_rooms =
HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned());
loop {
if listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut event_cache_generic_updates_subscriber,
&mut send_queue_generic_updates_subscriber,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_break()
{
warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
break;
}
}
}
async fn listen_to_event_cache_and_send_queue_updates(
room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
listened_rooms: &mut HashSet<OwnedRoomId>,
latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
) -> ControlFlow<()> {
select! {
biased;
update = room_registration_receiver.recv().fuse() => {
match update {
Some(RoomRegistration::Add(room_id)) => {
listened_rooms.insert(room_id);
}
Some(RoomRegistration::Remove(room_id)) => {
listened_rooms.remove(&room_id);
}
None => {
error!("`room_registration` channel has been closed");
return ControlFlow::Break(());
}
}
}
room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv().fuse() => {
if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
if listened_rooms.contains(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
room_id
});
}
} else {
warn!("`event_cache_generic_updates` channel has been closed");
return ControlFlow::Break(());
}
}
send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
if listened_rooms.contains(&room_id) {
let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
room_id,
update
});
}
} else {
warn!("`send_queue_generic_updates` channel has been closed");
return ControlFlow::Break(());
}
}
}
ControlFlow::Continue(())
}
async fn compute_latest_events_task(
registered_rooms: Arc<RegisteredRooms>,
mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
) {
const BUFFER_SIZE: usize = 16;
let mut buffer = Vec::with_capacity(BUFFER_SIZE);
while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
compute_latest_events(®istered_rooms, &buffer).await;
buffer.clear();
}
warn!("`compute_latest_events_task` has stopped");
}
async fn compute_latest_events(
registered_rooms: &RegisteredRooms,
latest_event_queue_updates: &[LatestEventQueueUpdate],
) {
for latest_event_queue_update in latest_event_queue_updates {
match latest_event_queue_update {
LatestEventQueueUpdate::EventCache { room_id } => {
let rooms = registered_rooms.rooms.read().await;
if let Some(room_latest_events) = rooms.get(room_id) {
let mut room_latest_events = room_latest_events.write().await;
drop(rooms);
room_latest_events.update_with_event_cache().await;
} else {
error!(?room_id, "Failed to find the room");
continue;
}
}
LatestEventQueueUpdate::SendQueue { room_id, update } => {
let rooms = registered_rooms.rooms.read().await;
if let Some(room_latest_events) = rooms.get(room_id) {
let mut room_latest_events = room_latest_events.write().await;
drop(rooms);
room_latest_events.update_with_send_queue(update).await;
} else {
error!(?room_id, "Failed to find the room");
continue;
}
}
}
}
}
#[cfg(all(test, not(target_family = "wasm")))]
mod tests {
use std::ops::Not;
use assert_matches::assert_matches;
use matrix_sdk_base::{
RoomState,
deserialized_responses::TimelineEventKind,
linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
};
use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{
OwnedTransactionId, event_id,
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
owned_room_id, room_id, user_id,
};
use stream_assert::assert_pending;
use super::{
HashSet, LatestEventValue, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate, broadcast,
listen_to_event_cache_and_send_queue_updates, mpsc,
};
use crate::test_utils::mocks::MatrixMockServer;
#[async_test]
async fn test_latest_events_are_lazy() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let room_id_2 = room_id!("!r2");
let thread_id_1_0 = event_id!("$ev1.0");
let thread_id_2_0 = event_id!("$ev2.0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 2);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.contains_key(room_id_1));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
}
assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 3);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.contains_key(room_id_1));
assert!(rooms.contains_key(room_id_2));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
let room_1 = rooms.get(room_id_1).unwrap().read().await;
assert_eq!(room_1.per_thread().len(), 1);
assert!(room_1.per_thread().contains_key(thread_id_1_0));
let room_2 = rooms.get(room_id_2).unwrap().read().await;
assert_eq!(room_2.per_thread().len(), 1);
assert!(room_2.per_thread().contains_key(thread_id_2_0));
}
}
#[async_test]
async fn test_forget_room() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
}
latest_events.forget_room(room_id_0).await;
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert!(rooms.is_empty());
}
}
#[async_test]
async fn test_forget_thread() {
let room_id_0 = room_id!("!r0");
let room_id_1 = room_id!("!r1");
let thread_id_0_0 = event_id!("$ev0.0");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
client.event_cache().subscribe().unwrap();
let latest_events = client.latest_events().await;
assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
let room_0 = rooms.get(room_id_0).unwrap().read().await;
assert_eq!(room_0.per_thread().len(), 1);
assert!(room_0.per_thread().contains_key(thread_id_0_0));
}
latest_events.forget_thread(room_id_0, thread_id_0_0).await;
{
let rooms = latest_events.state.registered_rooms.rooms.read().await;
assert_eq!(rooms.len(), 1);
assert!(rooms.contains_key(room_id_0));
assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_room_registration() {
let room_id_0 = owned_room_id!("!r0");
let room_id_1 = owned_room_id!("!r1");
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert_eq!(listened_rooms.len(), 1);
assert!(listened_rooms.contains(&room_id_0));
assert!(latest_event_queue_receiver.is_empty());
}
{
room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert_eq!(listened_rooms.len(), 1);
assert!(listened_rooms.contains(&room_id_0));
assert!(latest_event_queue_receiver.is_empty());
}
{
room_registration_sender
.send(RoomRegistration::Add(room_id_1.to_owned()))
.await
.unwrap();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert_eq!(listened_rooms.len(), 2);
assert!(listened_rooms.contains(&room_id_0));
assert!(listened_rooms.contains(&room_id_1));
assert!(latest_event_queue_receiver.is_empty());
}
}
#[async_test]
async fn test_inputs_task_stops_when_room_registration_channel_is_closed() {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
room_registration_receiver.close();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert_eq!(listened_rooms.len(), 0);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_inputs_task_can_listen_to_room_event_cache() {
let room_id = owned_room_id!("!r0");
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
room_event_cache_generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
.unwrap();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(listened_rooms.is_empty());
assert!(latest_event_queue_receiver.is_empty());
}
{
room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
room_event_cache_generic_update_sender
.send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
.unwrap();
for _ in 0..2 {
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
}
assert_eq!(listened_rooms.len(), 1);
assert!(listened_rooms.contains(&room_id));
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_can_listen_to_send_queue() {
let room_id = owned_room_id!("!r0");
let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
{
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid0"),
event_id: event_id!("$ev0").to_owned(),
},
})
.unwrap();
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
assert!(listened_rooms.is_empty());
assert!(latest_event_queue_receiver.is_empty());
}
{
room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
send_queue_generic_update_sender
.send(SendQueueUpdate {
room_id: room_id.clone(),
update: RoomSendQueueUpdate::SentEvent {
transaction_id: OwnedTransactionId::from("txnid1"),
event_id: event_id!("$ev1").to_owned(),
},
})
.unwrap();
for _ in 0..2 {
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_continue()
);
}
assert_eq!(listened_rooms.len(), 1);
assert!(listened_rooms.contains(&room_id));
assert!(latest_event_queue_receiver.is_empty().not());
}
}
#[async_test]
async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
drop(room_event_cache_generic_update_sender);
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert_eq!(listened_rooms.len(), 0);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
broadcast::channel(1);
let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
broadcast::channel(1);
let mut listened_rooms = HashSet::new();
let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
drop(send_queue_generic_update_sender);
assert!(
listen_to_event_cache_and_send_queue_updates(
&mut room_registration_receiver,
&mut room_event_cache_generic_update_receiver,
&mut send_queue_generic_update_receiver,
&mut listened_rooms,
&latest_event_queue_sender,
)
.await
.is_break()
);
assert_eq!(listened_rooms.len(), 0);
assert!(latest_event_queue_receiver.is_empty());
}
#[async_test]
async fn test_latest_event_value_is_updated_via_event_cache() {
let room_id = owned_room_id!("!r0");
let user_id = user_id!("@mnt_io:matrix.org");
let event_factory = EventFactory::new().sender(user_id).room(&room_id);
let event_id_0 = event_id!("$ev0");
let event_id_1 = event_id!("$ev1");
let event_id_2 = event_id!("$ev2");
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
{
client.base_client().get_or_create_room(&room_id, RoomState::Joined);
client
.event_cache_store()
.lock()
.await
.expect("Could not acquire the event cache lock")
.as_clean()
.expect("Could not acquire a clean event cache lock")
.handle_linked_chunk_updates(
LinkedChunkId::Room(&room_id),
vec![
Update::NewItemsChunk {
previous: None,
new: ChunkIdentifier::new(0),
next: None,
},
Update::PushItems {
at: Position::new(ChunkIdentifier::new(0), 0),
items: vec![
event_factory.text_msg("hello").event_id(event_id_0).into(),
event_factory.text_msg("world").event_id(event_id_1).into(),
],
},
],
)
.await
.unwrap();
}
let event_cache = client.event_cache();
event_cache.subscribe().unwrap();
let latest_events = client.latest_events().await;
let mut latest_event_stream =
latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
assert_matches!(
latest_event_stream.get().await,
LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
assert_matches!(
event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(message_content)
)
) => {
assert_eq!(message_content.content.body(), "world");
}
);
}
);
assert_pending!(latest_event_stream);
server
.sync_room(
&client,
JoinedRoomBuilder::new(&room_id)
.add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
)
.await;
assert_matches!(
latest_event_stream.next().await,
Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
assert_matches!(
event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(message_content)
)
) => {
assert_eq!(message_content.content.body(), "raclette !");
}
);
}
);
}
}