use std::{ops::Not as _, time::Duration};
use assert_matches2::{assert_let, assert_matches};
use eyeball_im::VectorDiff;
use futures_util::StreamExt as _;
use matrix_sdk::{
Client, ThreadingSupport, assert_let_timeout,
sleep::sleep,
test_utils::mocks::{
MatrixMockServer, RoomContextResponseTemplate, RoomRelationsResponseTemplate,
},
};
use matrix_sdk_test::{ALICE, BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use matrix_sdk_ui::timeline::{
RoomExt as _, TimelineBuilder, TimelineDetails, TimelineEventFocusThreadMode,
TimelineEventItemId, TimelineFocus, VirtualTimelineItem,
};
use ruma::{
MilliSecondsSinceUnixEpoch,
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
event_id,
events::{
AnySyncTimelineEvent,
poll::unstable_start::{
NewUnstablePollStartEventContent, UnstablePollAnswer, UnstablePollStartContentBlock,
UnstablePollStartEventContent,
},
receipt::{ReceiptThread, ReceiptType},
room::{
ImageInfo,
message::{
Relation, ReplacementMetadata, RoomMessageEventContent,
RoomMessageEventContentWithoutRelation,
},
},
sticker::{StickerEventContent, StickerMediaSource},
},
owned_event_id, owned_mxc_uri, room_id, user_id,
};
use stream_assert::assert_pending;
use tokio::task::yield_now;
async fn client_with_threading_support(server: &MatrixMockServer) -> Client {
server
.client_builder()
.on_builder(|builder| {
builder.with_threading_support(ThreadingSupport::Enabled { with_subscriptions: false })
})
.build()
.await
}
#[async_test]
async fn test_new_empty_thread() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id })
.build()
.await
.unwrap();
let (items, mut timeline_stream) = timeline.subscribe().await;
assert!(items.is_empty());
assert_pending!(timeline_stream);
}
#[async_test]
async fn test_thread_backpagination() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
let factory = EventFactory::new().room(room_id).sender(sender_id);
let thread_root_event_id = owned_event_id!("$root");
server
.mock_room_event()
.match_event_id()
.ok(factory
.text_msg("Thread root")
.sender(sender_id)
.event_id(&thread_root_event_id)
.into())
.mock_once()
.mount()
.await;
let batch1 = vec![
factory
.text_msg("Threaded event 4")
.event_id(event_id!("$4"))
.in_thread_reply(&thread_root_event_id, event_id!("$2"))
.into_raw(),
factory
.text_msg("Threaded event 3")
.event_id(event_id!("$3"))
.in_thread(&thread_root_event_id, event_id!("$2"))
.into_raw(),
];
let batch2 = vec![
factory
.text_msg("Threaded event 2")
.event_id(event_id!("$2"))
.in_thread(&thread_root_event_id, event_id!("$1"))
.into_raw(),
factory
.text_msg("Threaded event 1")
.event_id(event_id!("$1"))
.in_thread(&thread_root_event_id, event_id!("$root"))
.into_raw(),
];
server
.mock_room_relations()
.match_target_event(thread_root_event_id.clone())
.ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
.mock_once()
.mount()
.await;
server
.mock_room_relations()
.match_target_event(thread_root_event_id.clone())
.match_from("next_batch")
.ok(RoomRelationsResponseTemplate::default().events(batch2))
.mock_once()
.mount()
.await;
let room = server.sync_joined_room(&client, room_id).await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (items, mut timeline_stream) = timeline.subscribe().await;
assert!(items.is_empty());
assert_pending!(timeline_stream);
timeline.paginate_backwards(20).await.unwrap();
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 3);
{
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "Threaded event 3");
assert_matches!(event_item.content().in_reply_to(), None);
}
{
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "Threaded event 4");
assert_eq!(event_item.content().in_reply_to().unwrap().event_id, event_id!("$2"));
}
let hit_start = timeline.paginate_backwards(100).await.unwrap();
assert!(hit_start);
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::Insert { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_root_event_id);
assert_matches!(event_item.content().in_reply_to(), None);
assert_let!(VectorDiff::Insert { index: 2, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), event_id!("$1"));
assert_matches!(event_item.content().in_reply_to(), None);
assert_let!(VectorDiff::Insert { index: 3, value } = &timeline_updates[2]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), event_id!("$2"));
assert_matches!(event_item.content().in_reply_to(), None);
let items = timeline.items().await;
assert_eq!(items.len(), 6);
assert!(items[0].is_date_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "Thread root");
assert_eq!(
items[2].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 1"
);
assert_eq!(
items[3].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 2"
);
assert_eq!(
items[4].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 3"
);
assert_eq!(
items[5].as_event().unwrap().content().as_message().unwrap().body(),
"Threaded event 4"
);
}
#[async_test]
async fn test_extract_bundled_thread_summary() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let latest_event_id = event_id!("$latest_event");
let event = f
.text_msg("thready thread mcthreadface")
.with_bundled_thread_summary(
f.text_msg("the last one!").event_id(latest_event_id).into(),
42,
false,
)
.event_id(thread_event_id);
server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(event)).await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "the last one!");
assert_eq!(latest_event.sender, *ALICE);
assert!(latest_event.sender_profile.is_unavailable());
assert_eq!(summary.num_replies, 42);
assert_eq!(summary.public_read_receipt_event_id, None);
assert_eq!(summary.private_read_receipt_event_id, None);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
}
#[async_test]
async fn test_new_thread_reply_causes_thread_summary_update() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let event = f.text_msg("thready thread mcthreadface").event_id(thread_event_id);
server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(event)).await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
let reply_event_id = event_id!("$thread_reply");
let event = f
.text_msg("thread reply")
.sender(&BOB)
.in_thread(thread_event_id, thread_event_id)
.event_id(reply_event_id);
server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_timeline_event(event)).await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), reply_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.content().thread_root().as_deref(), Some(thread_event_id));
let replied_to_details = value.as_event().unwrap().content().in_reply_to().unwrap().event;
assert_let!(TimelineDetails::Ready(replied_to_event) = replied_to_details);
assert!(replied_to_event.content.thread_summary().is_none());
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), reply_event_id);
let replied_to_details = value.as_event().unwrap().content().in_reply_to().unwrap().event;
assert_let!(TimelineDetails::Ready(replied_to_event) = replied_to_details);
assert!(replied_to_event.content.thread_summary().is_some());
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[2]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_root().is_none());
assert_let!(Some(summary) = event_item.content().thread_summary());
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "thread reply");
assert_eq!(latest_event.sender, *BOB);
assert!(latest_event.sender_profile.is_unavailable());
assert_eq!(summary.num_replies, 1);
assert_eq!(summary.public_read_receipt_event_id, None);
assert_eq!(summary.private_read_receipt_event_id, None);
assert_pending!(stream);
let another_reply_event_id = event_id!("$another_thread_reply");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("another thread reply")
.sender(&BOB)
.in_thread(thread_event_id, reply_event_id)
.event_id(another_reply_event_id),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 4);
assert_let!(VectorDiff::Set { index: 2, .. } = &timeline_updates[0]);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), another_reply_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.content().thread_root().as_deref(), Some(thread_event_id));
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[2]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), reply_event_id);
let replied_to_details = value.as_event().unwrap().content().in_reply_to().unwrap().event;
assert_let!(TimelineDetails::Ready(replied_to_event) = replied_to_details);
assert_eq!(replied_to_event.content.thread_summary().unwrap().num_replies, 2);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[3]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_root().is_none());
assert_let!(Some(summary) = event_item.content().thread_summary());
assert!(summary.latest_event.is_ready());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(latest_event.content.as_message().unwrap().body(), "another thread reply");
assert_eq!(latest_event.sender, *BOB);
assert!(latest_event.sender_profile.is_unavailable());
assert_eq!(summary.num_replies, 2);
assert_eq!(summary.public_read_receipt_event_id, None);
assert_eq!(summary.private_read_receipt_event_id, None);
}
#[async_test]
async fn test_thread_msg_edit_reflects_in_summary() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: true })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let reply_event_id = event_id!("$thread_reply");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("thready thread mcthreadface").event_id(thread_event_id),
)
.add_timeline_event(
f.text_msg("threaded reply")
.sender(&BOB)
.in_thread(thread_event_id, thread_event_id)
.event_id(reply_event_id),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 4);
{
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.read_receipts().len(), 1);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.read_receipts().len(), 2);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[2]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(
latest_event.identifier,
TimelineEventItemId::EventId(reply_event_id.to_owned())
);
assert_eq!(latest_event.content.as_message().unwrap().body(), "threaded reply");
assert_eq!(summary.num_replies, 1);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
}
let edit_reply_event_id = event_id!("$thread_reply_edit");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("*edited* threaded reply")
.edit(
reply_event_id,
RoomMessageEventContentWithoutRelation::text_plain("edited threaded reply"),
)
.sender(&BOB)
.event_id(edit_reply_event_id),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(
latest_event.identifier,
TimelineEventItemId::EventId(edit_reply_event_id.to_owned())
);
assert_eq!(latest_event.content.as_message().unwrap().body(), "edited threaded reply");
assert_eq!(summary.num_replies, 1);
assert_pending!(stream);
}
#[async_test]
async fn test_thread_poll_edit_reflects_in_summary() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: true })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let reply_event_id = event_id!("$thread_reply");
let edit_reply_event_id = event_id!("$thread_reply_edit");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("thready thread mcthreadface").event_id(thread_event_id),
)
.add_timeline_event(
f.poll_start(
"What's your favorite colour? Red, green, or blue?",
"What's your favorite colour?",
vec!["Red", "Green", "Blue"],
)
.sender(&BOB)
.in_thread(thread_event_id, thread_event_id)
.event_id(reply_event_id),
)
.add_timeline_event(
f.poll_edit(
reply_event_id,
"What's your favourite colour?",
vec!["Red", "Green", "Blue", "Yellow"],
)
.sender(&BOB)
.event_id(edit_reply_event_id),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 4);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.read_receipts().len(), 1);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert!(event_item.content().thread_summary().is_none());
assert_eq!(event_item.read_receipts().len(), 2);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[2]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert_let!(TimelineDetails::Ready(latest_event) = summary.latest_event);
assert_eq!(
latest_event.identifier,
TimelineEventItemId::EventId(edit_reply_event_id.to_owned())
);
let poll_results = latest_event.content.as_poll().unwrap().results();
assert_eq!(poll_results.question, "What's your favourite colour?");
assert_eq!(poll_results.answers.len(), 4);
assert_eq!(summary.num_replies, 1);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
assert_pending!(stream);
}
#[async_test]
async fn test_thread_filtering_for_sync() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
let thread_root_event_id = owned_event_id!("$root");
let room = server.sync_joined_room(&client, room_id).await;
let filtered_timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: true })
.build()
.await
.unwrap();
let (_, mut filtered_timeline_stream) = filtered_timeline.subscribe().await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: false })
.build()
.await
.unwrap();
let (_, mut timeline_stream) = timeline.subscribe().await;
let thread_timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (_, mut thread_timeline_stream) = thread_timeline.subscribe().await;
let factory = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
factory
.text_msg("Thread root")
.sender(sender_id)
.event_id(&thread_root_event_id),
)
.add_timeline_event(
factory
.text_msg("Within thread")
.sender(sender_id)
.event_id(event_id!("$threaded_event"))
.in_thread(&thread_root_event_id, &thread_root_event_id),
),
)
.await;
{
assert_let_timeout!(Some(timeline_updates) = filtered_timeline_stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "Thread root");
assert_matches!(event_item.content().thread_summary(), None);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
assert_matches!(value.as_event().unwrap().content().thread_summary(), Some(_));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert!(value.is_date_divider());
assert_pending!(filtered_timeline_stream);
}
{
assert_let_timeout!(Some(timeline_updates) = timeline_stream.next());
assert_eq!(timeline_updates.len(), 6);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "Thread root");
assert_matches!(event_item.content().thread_summary(), None);
assert!(event_item.read_receipts().is_empty().not());
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_matches!(event_item.content().thread_summary(), None);
assert!(event_item.read_receipts().is_empty());
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[2]);
assert_eq!(
value.as_event().unwrap().content().as_message().unwrap().body(),
"Within thread"
);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[3]);
assert_eq!(
value.as_event().unwrap().content().as_message().unwrap().body(),
"Within thread"
);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[4]);
assert_matches!(value.as_event().unwrap().content().thread_summary(), Some(_));
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[5]);
assert!(value.is_date_divider());
assert_pending!(timeline_stream);
}
{
assert_let_timeout!(Some(timeline_updates) = thread_timeline_stream.next());
assert_eq!(timeline_updates.len(), 4);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.content().as_message().unwrap().body(), "Thread root");
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_matches!(event_item.content().thread_summary(), None);
assert!(event_item.read_receipts().is_empty());
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[2]);
assert_eq!(
value.as_event().unwrap().content().as_message().unwrap().body(),
"Within thread"
);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[3]);
assert!(value.is_date_divider());
assert_pending!(timeline_stream);
}
}
#[async_test]
async fn test_thread_timeline_gets_related_events_from_sync() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let sender_id = user_id!("@alice:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("Within thread")
.sender(sender_id)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(threaded_event_id));
assert!(event_item.content().reactions().unwrap().is_empty());
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[1]);
assert!(value.is_date_divider());
assert_pending!(stream);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.reaction(threaded_event_id, "👍").sender(sender_id)),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), threaded_event_id);
assert!(event_item.content().reactions().unwrap().is_empty().not());
let other_timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id })
.build()
.await
.unwrap();
let (initial_items, _thread_stream) = other_timeline.subscribe().await;
assert_eq!(initial_items.len(), 2);
assert!(initial_items[0].is_date_divider());
let event_item = initial_items[1].as_event().unwrap();
assert_eq!(event_item.event_id(), Some(threaded_event_id));
assert!(event_item.content().reactions().unwrap().is_empty().not());
}
#[async_test]
async fn test_thread_timeline_gets_local_echoes() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("hello world")
.sender(*ALICE)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now()),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
let sent_event_id = event_id!("$sent_msg");
server.mock_room_state_encryption().plain().mount().await;
let (event_receiver, mock_builder) =
server.mock_room_send().ok_with_capture(sent_event_id, *ALICE);
mock_builder.mock_once().mount().await;
timeline.send(RoomMessageEventContent::text_plain("hello to you too!").into()).await.unwrap();
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert!(event_item.is_local_echo());
assert!(event_item.event_id().is_none());
let msglike = event_item.content().as_msglike().unwrap();
assert_eq!(msglike.thread_root.as_ref(), Some(&thread_root_event_id));
assert!(msglike.in_reply_to.is_none());
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(sent_event_id));
assert!(event_item.content().reactions().unwrap().is_empty());
assert_matches!(&timeline_updates[1], VectorDiff::Remove { index: 2 });
assert_let!(VectorDiff::PushBack { value: remote_event } = &timeline_updates[2]);
assert_eq!(remote_event.as_event().unwrap().event_id(), Some(sent_event_id));
assert_pending!(stream);
{
let raw_event = event_receiver.await.unwrap();
let event = raw_event.deserialize().unwrap();
assert_let!(
AnySyncTimelineEvent::MessageLike(
ruma::events::AnySyncMessageLikeEvent::RoomMessage(event)
) = event
);
let event = event.as_original().unwrap();
assert_let!(Some(Relation::Thread(thread)) = event.content.relates_to.clone());
assert_eq!(thread.event_id, thread_root_event_id);
assert!(thread.is_falling_back);
assert_eq!(thread.in_reply_to.unwrap().event_id, threaded_event_id);
}
server.mock_room_send().ok(event_id!("$reaction_id")).mock_once().mount().await;
timeline.toggle_reaction(&event_item.identifier(), "👍").await.unwrap();
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), sent_event_id);
assert!(event_item.content().reactions().unwrap().is_empty().not());
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert!(!timeline_updates.is_empty());
for timeline_update in timeline_updates {
assert_let!(VectorDiff::Set { index: 2, value } = timeline_update);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), sent_event_id);
assert!(event_item.content().reactions().unwrap().is_empty().not());
}
assert_pending!(stream);
}
#[async_test]
async fn test_thread_timeline_can_send_edit() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("hello world")
.sender(*ALICE)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now()),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
let sent_event_id = event_id!("$sent_msg");
server.mock_room_state_encryption().plain().mount().await;
server.mock_room_send().ok(sent_event_id).mount().await;
timeline
.send(
RoomMessageEventContent::text_plain("bonjour monde")
.make_replacement(ReplacementMetadata::new(threaded_event_id.to_owned(), None))
.into(),
)
.await
.unwrap();
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
let msglike = event_item.content().as_msglike().unwrap();
assert_eq!(msglike.thread_root.as_ref(), Some(&thread_root_event_id));
assert!(msglike.in_reply_to.is_none());
assert_eq!(msglike.as_message().unwrap().body(), "bonjour monde");
assert_pending!(stream);
}
#[async_test]
async fn test_send_sticker_thread() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("hello world")
.sender(*ALICE)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now()),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
server.mock_room_state_encryption().plain().mount().await;
let sent_event_id = event_id!("$sent_msg");
server.mock_room_send().ok(sent_event_id).mount().await;
let media_src = owned_mxc_uri!("mxc://example.com/1");
timeline
.send(
StickerEventContent::new("sticker!".to_owned(), ImageInfo::new(), media_src.clone())
.into(),
)
.await
.unwrap();
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
let sticker_item = event_item.content().as_sticker().unwrap();
let content = sticker_item.content();
assert_eq!(content.body, "sticker!");
assert_let!(StickerMediaSource::Plain(plain) = content.source.clone());
assert_eq!(plain, media_src);
assert_pending!(stream);
}
#[async_test]
async fn test_send_poll_thread() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let threaded_event_id = event_id!("$threaded_event");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_timeline_event(
f.text_msg("hello world")
.sender(*ALICE)
.event_id(threaded_event_id)
.in_thread(&thread_root_event_id, threaded_event_id)
.server_ts(MilliSecondsSinceUnixEpoch::now()),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
server.mock_room_state_encryption().plain().mount().await;
let sent_event_id = event_id!("$sent_msg");
server.mock_room_send().ok(sent_event_id).mount().await;
timeline
.send(
UnstablePollStartEventContent::New(NewUnstablePollStartEventContent::plain_text(
"let's vote",
UnstablePollStartContentBlock::new(
"what day is it today?",
vec![
UnstablePollAnswer::new("0", "monday"),
UnstablePollAnswer::new("1", "friday"),
]
.try_into()
.unwrap(),
),
))
.into(),
)
.await
.unwrap();
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert!(event_item.content().is_poll());
assert_pending!(stream);
}
#[async_test]
async fn test_sending_read_receipt_with_no_events_doesnt_unset_read_flag() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root_event_id = owned_event_id!("$root");
let f = EventFactory::new();
let room = server
.sync_room(&client, JoinedRoomBuilder::new(room_id).add_account_data(f.marked_unread(true)))
.await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root_event_id.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let marked_as_read = timeline.mark_as_read(SendReceiptType::Read).await.unwrap();
assert!(marked_as_read.not());
}
#[async_test]
async fn test_read_receipts() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
let room_id = room_id!("!a:b.c");
let thread_root = owned_event_id!("$root");
let receipt_thread = ReceiptThread::Thread(thread_root.clone());
let room = server.sync_joined_room(&client, room_id).await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root.clone() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
let f = EventFactory::new();
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hey to you too!")
.sender(*ALICE)
.in_thread(&thread_root, &thread_root)
.event_id(event_id!("$1")),
)
.add_timeline_event(
f.text_msg("how's it going?")
.sender(*ALICE)
.in_thread(&thread_root, event_id!("$1"))
.event_id(event_id!("$2")),
)
.add_timeline_event(
f.text_msg("good, u?")
.sender(*BOB)
.in_thread(&thread_root, event_id!("$2"))
.event_id(event_id!("$3")),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 5);
{
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let ev0 = value.as_event().unwrap();
assert_eq!(ev0.event_id(), Some(event_id!("$1")));
let rr = ev0.read_receipts();
assert_eq!(rr.len(), 1);
assert_eq!(rr[*ALICE].thread, receipt_thread);
assert_let!(VectorDiff::Set { index: 0, value } = &timeline_updates[1]);
let ev0 = value.as_event().unwrap();
assert_eq!(ev0.event_id(), Some(event_id!("$1")));
assert!(ev0.read_receipts().is_empty());
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[2]);
let ev1 = value.as_event().unwrap();
assert_eq!(ev1.event_id(), Some(event_id!("$2")));
let rr = ev1.read_receipts();
assert_eq!(rr.len(), 1);
assert_eq!(rr[*ALICE].thread, receipt_thread);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[3]);
let ev2 = value.as_event().unwrap();
assert_eq!(ev2.event_id(), Some(event_id!("$3")));
let rr = ev2.read_receipts();
assert_eq!(rr.len(), 1);
assert_eq!(rr[*BOB].thread, receipt_thread);
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[4]);
assert!(value.is_date_divider());
}
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$3"), *ALICE, ReceiptType::Read, receipt_thread.clone())
.add(event_id!("$3"), *BOB, ReceiptType::Read, receipt_thread.clone())
.into_event(),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
{
assert_let!(VectorDiff::Set { index: 2, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(event_id!("$2")));
assert!(event_item.read_receipts().is_empty());
}
{
assert_let!(VectorDiff::Set { index: 3, value } = &timeline_updates[1]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(event_id!("$3")));
let rr = event_item.read_receipts();
assert_eq!(rr.len(), 2);
assert_eq!(rr[*ALICE].thread, receipt_thread);
assert_eq!(rr[*BOB].thread, receipt_thread);
}
}
#[async_test]
async fn test_initial_read_receipts_are_correctly_populated() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let thread_root = owned_event_id!("$root");
let receipt_thread = ReceiptThread::Thread(thread_root.clone());
let f = EventFactory::new();
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hey to you too!")
.sender(*ALICE)
.in_thread(&thread_root, &thread_root)
.event_id(event_id!("$1")),
)
.add_receipt(
f.read_receipts()
.add(event_id!("$1"), *BOB, ReceiptType::Read, receipt_thread.clone())
.into_event(),
),
)
.await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root.clone() })
.build()
.await
.unwrap();
let (mut initial_items, mut stream) = timeline.subscribe().await;
if initial_items.is_empty() {
assert_let_timeout!(Some(timeline_updates) = stream.next());
for up in timeline_updates {
up.apply(&mut initial_items);
}
}
assert_eq!(initial_items.len(), 2);
let ev = initial_items[1].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$1")));
let rr = ev.read_receipts();
assert_eq!(rr.len(), 2);
assert_eq!(rr[*ALICE].thread, receipt_thread);
assert_eq!(rr[*BOB].thread, receipt_thread);
}
#[async_test]
async fn test_initial_read_receipts_compatibility_mode() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let f = EventFactory::new();
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hello in main timeline").sender(*BOB).event_id(event_id!("$0")),
)
.add_timeline_event(
f.text_msg("hello back in main timeline")
.sender(*ALICE)
.event_id(event_id!("$1")),
)
.add_receipt(
f.read_receipts()
.add(event_id!("$1"), *BOB, ReceiptType::Read, ReceiptThread::Unthreaded)
.into_event(),
),
)
.await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: true })
.build()
.await
.unwrap();
let (mut initial_items, mut stream) = timeline.subscribe().await;
if initial_items.is_empty() {
assert_let_timeout!(Some(timeline_updates) = stream.next());
for up in timeline_updates {
up.apply(&mut initial_items);
}
}
assert_eq!(initial_items.len(), 3);
assert!(initial_items[0].is_date_divider());
{
let ev = initial_items[1].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$0")));
let rr = ev.read_receipts();
assert!(rr.is_empty());
}
{
let ev = initial_items[2].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$1")));
let rr = ev.read_receipts();
assert_eq!(rr.len(), 2);
assert!(rr.get(*ALICE).is_some());
assert!(rr.get(*BOB).is_some());
}
}
#[async_test]
async fn test_send_read_receipts() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let user_id = client.user_id().unwrap();
let room_id = room_id!("!a:b.c");
let thread_root = owned_event_id!("$root");
let receipt_thread = ReceiptThread::Thread(thread_root.clone());
let f = EventFactory::new();
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hey to you too!")
.sender(*ALICE)
.in_thread(&thread_root, &thread_root)
.event_id(event_id!("$1")),
)
.add_timeline_event(
f.text_msg("how's it going?")
.sender(user_id)
.in_thread(&thread_root, event_id!("$1"))
.event_id(event_id!("$2")),
)
.add_timeline_event(
f.text_msg("good and you?")
.sender(*BOB)
.in_thread(&thread_root, event_id!("$2"))
.event_id(event_id!("$3")),
)
.add_timeline_event(
f.text_msg("u there?")
.sender(*BOB)
.in_thread(&thread_root, event_id!("$3"))
.event_id(event_id!("$4")),
),
)
.await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Thread { root_event_id: thread_root.clone() })
.build()
.await
.unwrap();
let (mut initial_items, mut stream) = timeline.subscribe().await;
if initial_items.is_empty() {
assert_let_timeout!(Some(timeline_updates) = stream.next());
for up in timeline_updates {
up.apply(&mut initial_items);
}
}
let ev = initial_items[1].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$1")));
let rr = ev.read_receipts();
assert_eq!(rr.len(), 1);
assert_eq!(rr[*ALICE].thread, receipt_thread);
let ev = initial_items[2].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$2")));
assert!(ev.read_receipts().is_empty());
let ev = initial_items[3].as_virtual().unwrap();
assert!(matches!(ev, VirtualTimelineItem::ReadMarker));
let ev = initial_items[4].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$3")));
assert!(ev.read_receipts().is_empty());
let ev = initial_items[5].as_event().unwrap();
assert_eq!(ev.event_id(), Some(event_id!("$4")));
let rr = ev.read_receipts();
assert_eq!(rr.len(), 1);
assert_eq!(rr[*BOB].thread, receipt_thread);
let did_send =
timeline.send_single_receipt(SendReceiptType::Read, owned_event_id!("$1")).await.unwrap();
assert!(did_send.not());
let did_send =
timeline.send_single_receipt(SendReceiptType::Read, owned_event_id!("$2")).await.unwrap();
assert!(did_send.not());
server
.mock_send_receipt(SendReceiptType::Read)
.match_thread(ReceiptThread::Thread(thread_root.clone()))
.match_event_id(event_id!("$3"))
.ok()
.mock_once()
.mount()
.await;
let did_send =
timeline.send_single_receipt(SendReceiptType::Read, owned_event_id!("$3")).await.unwrap();
assert!(did_send);
assert_pending!(stream);
{
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$3"), user_id, ReceiptType::Read, receipt_thread.clone())
.into_event(),
),
)
.await;
yield_now().await;
assert_pending!(stream);
}
server
.mock_send_receipt(SendReceiptType::Read)
.match_thread(ReceiptThread::Thread(thread_root.clone()))
.match_event_id(event_id!("$4"))
.ok()
.mock_once()
.mount()
.await;
let did_send = timeline.mark_as_read(SendReceiptType::Read).await.unwrap();
assert!(did_send);
{
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(event_id!("$4"), user_id, ReceiptType::Read, receipt_thread.clone())
.into_event(),
),
)
.await;
yield_now().await;
assert_pending!(stream);
}
let did_send = timeline.mark_as_read(SendReceiptType::Read).await.unwrap();
assert!(did_send.not());
}
#[async_test]
async fn test_permalink_doesnt_listen_to_thread_sync() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let room = server.sync_joined_room(&client, room_id).await;
let f = EventFactory::new().sender(&ALICE).room(room_id);
let thread_root = event_id!("$thread_root");
let event = f
.text_msg("hey to you too")
.event_id(event_id!("$target"))
.in_thread(thread_root, thread_root)
.into_event();
server
.mock_room_event_context()
.ok(RoomContextResponseTemplate::new(event))
.mock_once()
.mount()
.await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Event {
target: owned_event_id!("$target"),
num_context_events: 2,
thread_mode: TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true },
})
.build()
.await
.unwrap();
assert!(timeline.is_threaded());
let (initial_items, mut stream) = timeline.subscribe().await;
assert_eq!(initial_items.len(), 2);
assert!(initial_items[0].is_date_divider());
assert_eq!(initial_items[1].as_event().unwrap().event_id(), Some(event_id!("$target")));
assert_pending!(stream);
server
.mock_room_relations()
.match_target_event(thread_root.to_owned())
.ok(RoomRelationsResponseTemplate::default()
.events(vec![
f.text_msg("a new threaded event")
.event_id(event_id!("$new_threaded"))
.in_thread(thread_root, event_id!("$new_threaded")),
])
.next_batch("next-batch"))
.mock_once()
.mount()
.await;
let (room_event_cache, _drop_guards) = room.event_cache().await.unwrap();
let outcome = room_event_cache
.thread_pagination(thread_root.to_owned())
.await
.unwrap()
.run_backwards_once(42)
.await
.unwrap();
assert!(outcome.reached_start.not());
sleep(Duration::from_millis(100)).await;
assert_pending!(stream);
}
#[async_test]
async fn test_redacted_replied_to_is_updated() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let f = EventFactory::new().sender(&ALICE).room(room_id);
let thread_root = event_id!("$thread_root");
let first_reply = event_id!("$first_reply");
let second_reply = event_id!("$second_reply");
let room = server.sync_joined_room(&client, room_id).await;
let timeline = TimelineBuilder::new(&room)
.with_focus(TimelineFocus::Thread { root_event_id: thread_root.to_owned() })
.build()
.await
.unwrap();
let (initial_items, mut stream) = timeline.subscribe().await;
assert!(initial_items.is_empty());
assert_pending!(stream);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(
f.text_msg("hey to you too!")
.event_id(first_reply)
.in_thread(thread_root, thread_root),
)
.add_timeline_event(
f.text_msg("how are you doing?")
.event_id(second_reply)
.in_thread_reply(thread_root, first_reply),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 3);
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[0]);
let ev1 = value.as_event().unwrap();
assert_eq!(ev1.event_id(), Some(first_reply));
assert_let!(VectorDiff::PushBack { value } = &timeline_updates[1]);
let ev2 = value.as_event().unwrap();
assert_eq!(ev2.event_id(), Some(second_reply));
let msglike = ev2.content().as_msglike().unwrap();
let in_reply_to = msglike.in_reply_to.as_ref().unwrap();
assert_eq!(in_reply_to.event_id, first_reply);
assert_let!(TimelineDetails::Ready(replied_to) = &in_reply_to.event);
assert!(replied_to.content.is_message());
assert_let!(VectorDiff::PushFront { value } = &timeline_updates[2]);
assert!(value.is_date_divider());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.redaction(first_reply).event_id(event_id!("$redaction"))),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 2);
assert_let!(VectorDiff::Remove { index: 1 } = &timeline_updates[0]);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[1]);
let ev2 = value.as_event().unwrap();
assert_eq!(ev2.event_id(), Some(second_reply));
let msglike = ev2.content().as_msglike().unwrap();
let in_reply_to = msglike.in_reply_to.as_ref().unwrap();
assert_eq!(in_reply_to.event_id, first_reply);
assert_let!(TimelineDetails::Ready(replied_to_event) = &in_reply_to.event);
assert!(replied_to_event.content.is_redacted());
}
#[async_test]
async fn test_redaction_affects_thread_summary() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let room_id = room_id!("!a:b.c");
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_root = event_id!("$thread_root");
let thread_reply = event_id!("$thread_reply");
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.text_msg("thread root").event_id(thread_root))
.add_timeline_event(
f.text_msg("thread reply")
.in_thread(thread_root, thread_root)
.event_id(thread_reply),
),
)
.await;
let timeline = room
.timeline_builder()
.with_focus(TimelineFocus::Live { hide_threaded_events: true })
.build()
.await
.unwrap();
let (mut initial_items, mut stream) = timeline.subscribe().await;
if initial_items.is_empty() {
assert_let_timeout!(Some(timeline_updates) = stream.next());
for up in timeline_updates {
up.apply(&mut initial_items);
}
}
assert_eq!(initial_items.len(), 2);
assert!(initial_items[0].is_date_divider());
let event_item = initial_items[1].as_event().unwrap();
assert_eq!(event_item.event_id(), Some(thread_root));
let summary = event_item.content().as_msglike().unwrap().thread_summary.as_ref().unwrap();
assert_eq!(summary.num_replies, 1);
assert_let!(TimelineDetails::Ready(embedded) = &summary.latest_event);
assert_eq!(embedded.identifier, TimelineEventItemId::EventId(thread_reply.to_owned()));
assert_pending!(stream);
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_timeline_event(f.redaction(thread_reply).event_id(event_id!("$redaction"))),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id(), Some(thread_root));
assert!(event_item.content().as_msglike().unwrap().thread_summary.is_none());
}
#[async_test]
async fn test_main_timeline_has_receipts_in_thread_summaries() {
let server = MatrixMockServer::new().await;
let client = client_with_threading_support(&server).await;
client.event_cache().subscribe().unwrap();
let own_user = client.user_id().unwrap();
let room_id = room_id!("!a:b.c");
let f = EventFactory::new().room(room_id).sender(&ALICE);
let thread_event_id = event_id!("$thread_root");
let latest_event_id = event_id!("$latest_event");
let room = server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.add_receipt(
f.read_receipts()
.add(
latest_event_id,
own_user,
ReceiptType::Read,
ReceiptThread::Thread(thread_event_id.to_owned()),
)
.into_event(),
)
.add_timeline_event(
f.text_msg("thready thread mcthreadface")
.with_bundled_thread_summary(
f.text_msg("the last one!").event_id(latest_event_id).into(),
42,
false,
)
.event_id(thread_event_id),
),
)
.await;
let timeline = room.timeline().await.unwrap();
let (mut initial_items, mut stream) = timeline.subscribe().await;
if initial_items.is_empty() {
assert_let_timeout!(Some(timeline_updates) = stream.next());
for up in timeline_updates {
up.apply(&mut initial_items);
}
}
let items = timeline.items().await;
assert_eq!(items.len(), 2);
let value = &items[0];
assert!(value.is_date_divider());
let value = &items[1];
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert!(summary.latest_event.is_ready());
assert_eq!(summary.public_read_receipt_event_id.as_deref(), Some(latest_event_id));
assert_eq!(summary.private_read_receipt_event_id, None);
let new_latest_event_id = event_id!("$new_latest_event_id");
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id).add_receipt(
f.read_receipts()
.add(
new_latest_event_id,
own_user,
ReceiptType::ReadPrivate,
ReceiptThread::Thread(thread_event_id.to_owned()),
)
.into_event(),
),
)
.await;
assert_let_timeout!(Some(timeline_updates) = stream.next());
assert_eq!(timeline_updates.len(), 1);
assert_let!(VectorDiff::Set { index: 1, value } = &timeline_updates[0]);
let event_item = value.as_event().unwrap();
assert_eq!(event_item.event_id().unwrap(), thread_event_id);
assert_let!(Some(summary) = event_item.content().thread_summary());
assert_eq!(summary.public_read_receipt_event_id.as_deref(), Some(latest_event_id));
assert_eq!(summary.private_read_receipt_event_id.as_deref(), Some(new_latest_event_id));
}