use std::time::Duration;
use matrix_sdk::{
ThreadingSupport, assert_let_timeout,
event_cache::RoomEventCacheUpdate,
test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
};
use matrix_sdk_test::{BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{
event_id,
events::{
Mentions,
receipt::{ReceiptThread, ReceiptType},
room::{member::MembershipState, message::RedactedRoomMessageEventContent},
},
room_id,
};
#[async_test]
async fn test_unread_count_new_message_no_receipt() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("world").event_id(event_id!("$2"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
}
#[async_test]
async fn test_unread_count_new_message_with_known_receipt() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let own_user_id = client.user_id().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3")))
.add_receipt(
f.read_receipts()
.add(
event_id!("$2"),
own_user_id,
ReceiptType::Read,
ReceiptThread::Unthreaded,
)
.into_event(),
),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
}
#[async_test]
async fn test_unread_count_implicit_receipt_own_message() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let own_user_id = client.user_id().unwrap();
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").sender(own_user_id).event_id(event_id!("$3")))
.add_timeline_event(f.text_msg("ev4").event_id(event_id!("$4")))
.add_timeline_event(f.text_msg("ev5").event_id(event_id!("$5"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
assert_eq!(room.read_receipts().latest_active.unwrap().event_id, event_id!("$3"));
}
#[async_test]
async fn test_unread_count_receipt_only_no_new_message() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let own_user_id = client.user_id().unwrap();
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 3);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$2"), own_user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
.into_event(),
),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
}
#[async_test]
async fn test_unread_count_pending_receipt() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let own_user_id = client.user_id().unwrap();
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3")))
.add_receipt(
f.read_receipts()
.add(
event_id!("$future"),
own_user_id,
ReceiptType::Read,
ReceiptThread::Unthreaded,
)
.into_event(),
),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(..)) = room_cache_updates.recv()
);
assert_let_timeout!(
Ok(RoomEventCacheUpdate::AddEphemeralEvents { .. }) = room_cache_updates.recv()
);
assert_eq!(room.num_unread_messages(), 3);
let room_read_receipts = room.read_receipts();
assert!(room_read_receipts.pending.iter().any(|id| id == event_id!("$future")));
assert_eq!(room_read_receipts.pending.len(), 1);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("the future event").event_id(event_id!("$future")))
.add_timeline_event(f.text_msg("ev4").event_id(event_id!("$4"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
assert!(room.read_receipts().pending.is_empty());
}
#[async_test]
async fn test_unread_count_accumulates_across_syncs() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 3);
}
#[async_test]
async fn test_state_event_does_not_increment_unread() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_state_bulk([f
.member(*BOB)
.membership(MembershipState::Join)
.event_id(event_id!("$1"))
.into_raw_sync_state()]),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 0);
}
#[async_test]
async fn test_reaction_does_not_increment_unread() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_bulk([
f.text_msg("hello").event_id(event_id!("$1")).into_raw(),
f.reaction(event_id!("$1"), "👍").event_id(event_id!("$2")).into(),
]),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
}
#[async_test]
async fn test_redaction_does_not_increment_unread() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.redacted(*BOB, RedactedRoomMessageEventContent::new())),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 0);
}
#[async_test]
async fn test_gappy_sync_keeps_then_next_sync_resets_unread_count() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("world").event_id(event_id!("$2"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("prev-batch".to_owned()),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("again").event_id(event_id!("$3"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
}
#[async_test]
async fn test_mentions_increments_unread_mentions() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
let member_event = f
.member(client.user_id().unwrap())
.membership(MembershipState::Join)
.event_id(event_id!("$member"));
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hello example")
.event_id(event_id!("$1"))
.mentions(Mentions::with_user_ids([client.user_id().unwrap().to_owned()])),
)
.add_state_event(member_event),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
assert_eq!(room.num_unread_mentions(), 1);
}
#[async_test]
async fn test_compute_unread_counts_considers_active_receipt() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hello 1").sender(own_user_id).event_id(event_id!("$1")),
)
.add_timeline_event(f.text_msg("hello 2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("hello 3").event_id(event_id!("$3")))
.add_receipt(
f.read_receipts()
.add(
event_id!("$2"),
own_user_id,
ReceiptType::Read,
ReceiptThread::Unthreaded,
)
.into_event(),
),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello 4").event_id(event_id!("$4"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
}
#[async_test]
async fn test_select_best_receipt_considers_thread_config() {
let server = MatrixMockServer::new().await;
let client = server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: false })
})
.build()
.await;
let own_user_id = client.user_id().unwrap();
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
let thread_root = event_id!("$1");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello 1").event_id(thread_root))
.add_timeline_event(f.text_msg("hello 2").event_id(event_id!("$2")))
.add_timeline_event(
f.text_msg("hello 3")
.in_thread(thread_root, thread_root)
.sender(own_user_id)
.event_id(event_id!("$3")),
),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
}
#[async_test]
async fn test_unread_counts_updated_after_duplicate_only_sync_response() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello 1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("hello 2").event_id(event_id!("$2"))),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(..)) = room_cache_updates.recv()
);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello 2").event_id(event_id!("$2")))
.add_receipt(
f.read_receipts()
.add(
event_id!("$2"),
own_user_id,
ReceiptType::Read,
ReceiptThread::Unthreaded,
)
.into_event(),
),
)
.await;
assert_let_timeout!(
Ok(RoomEventCacheUpdate::AddEphemeralEvents { .. }) = room_cache_updates.recv()
);
assert_eq!(room.num_unread_messages(), 0);
}
#[async_test]
async fn test_compute_unread_counts_triggers_backpaginations() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
client.event_cache().config_mut().experimental_auto_backpagination = true;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![
f.text_msg("hello 2").event_id(event_id!("$2")),
f.text_msg("hello 1").event_id(event_id!("$1")),
])
.with_delay(Duration::from_millis(100)))
.mock_once()
.mount()
.await;
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("hello 3").event_id(event_id!("$3")))
.add_timeline_event(f.text_msg("hello 4").event_id(event_id!("$4")))
.add_receipt(
f.read_receipts()
.add(
event_id!("$1"),
own_user_id,
ReceiptType::Read,
ReceiptThread::Unthreaded,
)
.into_event(),
)
.set_timeline_limited()
.set_timeline_prev_batch("prev_batch".to_owned()),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 2);
assert_let_timeout!(Duration::from_millis(150), Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 3);
}
#[async_test]
async fn test_read_receipt_from_store_used_as_latest_active() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$2"), own_user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
.into_event(),
),
)
.await;
client.event_cache().subscribe().unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 1);
}
#[async_test]
async fn test_all_read_receipts_from_store_used_as_latest_active() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let own_user_id = client.user_id().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$2"), own_user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
.add(
event_id!("$3"),
own_user_id,
ReceiptType::ReadPrivate,
ReceiptThread::Main,
)
.into_event(),
),
)
.await;
client.event_cache().subscribe().unwrap();
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (_, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("ev1").event_id(event_id!("$1")))
.add_timeline_event(f.text_msg("ev2").event_id(event_id!("$2")))
.add_timeline_event(f.text_msg("ev3").event_id(event_id!("$3"))),
)
.await;
assert_let_timeout!(Ok(_) = room_cache_updates.recv());
assert_eq!(room.num_unread_messages(), 0);
}