use std::{
cmp::{max, min},
collections::HashMap,
sync::{Arc, LazyLock, Mutex},
};
use futures::StreamExt;
use matrix_sdk::{
Room, SuccessorRoom,
room::RoomMember,
ruma::{OwnedEventId, OwnedRoomId, events::receipt::Receipt},
};
use matrix_sdk_ui::{
Timeline,
eyeball_im::{Vector, VectorDiff},
timeline::{
self, LatestEventValue, RoomExt, TimelineDetails, TimelineEventItemId, TimelineItem,
},
};
use serde::{Deserialize, Serialize, Serializer, ser::SerializeStruct};
use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, error, trace};
use crate::{
events::{
event_preview::text_preview_of_timeline_item, handlers::get_sender_username_from_profile,
},
init::singletons::{CLIENT, UIUpdateMessage, broadcast_event},
models::async_requests::{MatrixRequest, submit_async_request},
room::{
frontend_events::events_dto::{FrontendTimelineItem, to_frontend_timeline_item},
joined_room::UnreadMessageCount,
rooms_list::{RoomsListUpdate, enqueue_rooms_list_update},
},
user::user_power_level::UserPowerLevels,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase", rename_all_fields = "camelCase")]
pub enum PaginationDirection {
Forwards,
Backwards,
}
impl std::fmt::Display for PaginationDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Forwards => write!(f, "forwards"),
Self::Backwards => write!(f, "backwards"),
}
}
}
pub type TimelineRequestSender = watch::Sender<Vec<BackwardsPaginateUntilEventRequest>>;
#[derive(Debug)]
pub struct BackwardsPaginateUntilEventRequest {
pub room_id: OwnedRoomId,
pub target_event_id: OwnedEventId,
pub starting_index: usize,
pub current_tl_len: usize,
}
pub enum TimelineUpdate {
FirstUpdate {
initial_items: Vector<Arc<TimelineItem>>,
},
NewItems {
new_items: Vector<Arc<TimelineItem>>,
clear_cache: bool,
},
NewUnreadMessagesCount(UnreadMessageCount),
TargetEventFound {
target_event_id: OwnedEventId,
index: usize,
},
PaginationRunning(PaginationDirection),
PaginationError {
error: timeline::Error,
direction: PaginationDirection,
},
PaginationIdle {
fully_paginated: bool,
direction: PaginationDirection,
},
EventDetailsFetched {
event_id: OwnedEventId,
result: Result<(), timeline::Error>,
},
MessageEdited {
timeline_event_item_id: TimelineEventItemId,
result: Result<(), timeline::Error>,
},
RoomMembersSynced,
RoomMembersListFetched { members: Vec<RoomMember> },
_MediaFetched,
TypingUsers {
users: Vec<String>,
},
UserPowerLevels(UserPowerLevels),
OwnUserReadReceipt(Receipt),
}
pub static TIMELINE_STATES: LazyLock<Mutex<HashMap<TimelineKind, TimelineUiState>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
#[derive(Debug)]
pub struct TimelineUiState {
pub(crate) kind: TimelineKind,
pub(crate) user_power: UserPowerLevels,
pub(crate) fully_paginated: bool,
pub(crate) items: Vector<Arc<TimelineItem>>,
pub(crate) update_receiver: crossbeam_channel::Receiver<TimelineUpdate>,
pub(crate) request_sender: TimelineRequestSender,
pub(crate) scrolled_past_read_marker: bool,
pub(crate) latest_own_user_receipt: Option<Receipt>,
}
impl Serialize for TimelineUiState {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.kind {
TimelineKind::MainRoom { ref room_id } => {
let mut state = serializer.serialize_struct("TimelineUiState", 7)?;
state.serialize_field("timelineKind", "mainRoom")?;
state.serialize_field("roomId", room_id)?;
state.serialize_field("userPower", &self.user_power)?;
state.serialize_field("fullyPaginated", &self.fully_paginated)?;
state.serialize_field(
"items",
&serialize_timeline_items(&self.items, &self.kind, &self.user_power),
)?;
state.serialize_field("scrolledPastReadMarker", &self.scrolled_past_read_marker)?;
state.serialize_field("latestOwnUserReceipt", &self.latest_own_user_receipt)?;
state.end()
}
TimelineKind::Thread {
ref room_id,
ref thread_root_event_id,
} => {
let mut state = serializer.serialize_struct("TimelineUiState", 8)?;
state.serialize_field("timelineKind", "thread")?;
state.serialize_field("roomId", room_id)?;
state.serialize_field("threadRootEventId", thread_root_event_id)?;
state.serialize_field("userPower", &self.user_power)?;
state.serialize_field("fullyPaginated", &self.fully_paginated)?;
state.serialize_field(
"items",
&serialize_timeline_items(&self.items, &self.kind, &self.user_power),
)?;
state.serialize_field("scrolledPastReadMarker", &self.scrolled_past_read_marker)?;
state.serialize_field("latestOwnUserReceipt", &self.latest_own_user_receipt)?;
state.end()
}
}
}
}
fn serialize_timeline_items(
items: &Vector<Arc<TimelineItem>>,
timeline_kind: &TimelineKind,
user_power_levels: &UserPowerLevels,
) -> Vec<FrontendTimelineItem> {
items
.iter()
.filter_map(|item| to_frontend_timeline_item(item, timeline_kind, user_power_levels))
.collect()
}
pub fn take_timeline_endpoints(kind: &TimelineKind) -> Option<TimelineEndpoints> {
crate::room::joined_room::try_get_room_details(kind.room_id()).and_then(|ri| {
let mut lock = ri.lock().unwrap();
let details = match kind {
TimelineKind::MainRoom { .. } => &mut lock.main_timeline,
TimelineKind::Thread {
thread_root_event_id,
..
} => lock.thread_timelines.get_mut(thread_root_event_id)?,
};
let (update_receiver, request_sender) = details.timeline_singleton_endpoints.take()?;
Some(TimelineEndpoints {
_update_sender: details.timeline_update_sender.clone(),
update_receiver,
request_sender,
_successor_room: details.timeline.room().successor_room(),
})
})
}
pub async fn timeline_subscriber_handler(
room: Room,
timeline: Arc<Timeline>,
timeline_update_sender: crossbeam_channel::Sender<TimelineUpdate>,
mut request_receiver: watch::Receiver<Vec<BackwardsPaginateUntilEventRequest>>,
thread_root_event_id: Option<OwnedEventId>,
) {
fn find_target_event<'a>(
target_event_id_opt: &mut Option<OwnedEventId>,
mut new_items_iter: impl Iterator<Item = &'a Arc<TimelineItem>>,
) -> Option<(usize, OwnedEventId)> {
let found_index = target_event_id_opt.as_ref().and_then(|target_event_id| {
new_items_iter.position(|new_item| {
new_item
.as_event()
.is_some_and(|new_ev| new_ev.event_id() == Some(target_event_id))
})
});
if let Some(index) = found_index {
target_event_id_opt.take().map(|ev| (index, ev))
} else {
None
}
}
let room_id = room.room_id().to_owned();
trace!("Starting timeline subscriber for room {room_id}, thread {thread_root_event_id:?}...");
let (mut timeline_items, mut subscriber) = timeline.subscribe().await;
trace!(
"Received initial timeline update of {} items for room {room_id}, thread {thread_root_event_id:?}.",
timeline_items.len()
);
timeline_update_sender.send(TimelineUpdate::FirstUpdate {
initial_items: timeline_items.clone(),
}).unwrap_or_else(
|_e| panic!("Error: timeline update sender couldn't send first update ({} items) to room {room_id}!", timeline_items.len())
);
let mut target_event_id = None;
let mut found_target_event_id: Option<(usize, OwnedEventId)> = None;
loop {
tokio::select! {
biased;
Ok(()) = request_receiver.changed() => {
let prev_target_event_id = target_event_id.clone();
let new_request_details = request_receiver
.borrow_and_update()
.iter()
.find_map(|req| req.room_id
.eq(&room_id)
.then(|| (req.target_event_id.clone(), req.starting_index, req.current_tl_len))
);
target_event_id = new_request_details.as_ref().map(|(ev, ..)| ev.clone());
if let Some((new_target_event_id, starting_index, current_tl_len)) = new_request_details && prev_target_event_id.as_ref() != Some(&new_target_event_id) {
let starting_index = if current_tl_len == timeline_items.len() {
starting_index
} else {
timeline_items.len()
};
debug!("Received new request to search for event {new_target_event_id} in room {room_id} starting from index {starting_index} (tl len {}).", timeline_items.len());
if let Some(target_event_tl_index) = timeline_items
.focus()
.narrow(..starting_index)
.into_iter()
.rev()
.position(|i| i.as_event()
.and_then(|e| e.event_id())
.is_some_and(|ev_id| ev_id == new_target_event_id)
)
.map(|i| starting_index.saturating_sub(i).saturating_sub(1))
{
debug!("Found existing target event {new_target_event_id} in room {room_id} at index {target_event_tl_index}.");
target_event_id = None;
found_target_event_id = None;
timeline_update_sender.send(
TimelineUpdate::TargetEventFound {
target_event_id: new_target_event_id.clone(),
index: target_event_tl_index,
}
).unwrap_or_else(
|_e| panic!("Error: timeline update sender couldn't send TargetEventFound({new_target_event_id}, {target_event_tl_index}) to room {room_id}!")
);
broadcast_event(UIUpdateMessage::RefreshUI);
}
else {
trace!("Target event not in timeline. Starting backwards pagination \
in room {room_id}, thread {thread_root_event_id:?} to find target event \
{new_target_event_id} starting from index {starting_index}.",
);
submit_async_request(MatrixRequest::PaginateTimeline {
timeline_kind: if let Some(thread_root_event_id) = thread_root_event_id.clone() {
TimelineKind::Thread {
room_id: room_id.clone(),
thread_root_event_id,
}
} else {
TimelineKind::MainRoom {
room_id: room_id.clone(),
}
},
num_events: 50,
direction: PaginationDirection::Backwards,
});
}
}
}
batch_opt = subscriber.next() => {
let Some(batch) = batch_opt else { break };
let mut num_updates = 0;
let mut index_of_first_change = usize::MAX;
let mut index_of_last_change = usize::MIN;
let mut clear_cache = false;
let mut is_append = false;
for diff in batch {
num_updates += 1;
match diff {
VectorDiff::Append { values } => {
let _values_len = values.len();
index_of_first_change = min(index_of_first_change, timeline_items.len());
timeline_items.extend(values);
index_of_last_change = max(index_of_last_change, timeline_items.len());
trace!("timeline_subscriber: room {room_id} diff Append {_values_len}. Changes: {index_of_first_change}..{index_of_last_change}");
is_append = true;
}
VectorDiff::Clear => {
trace!("timeline_subscriber: room {room_id} diff Clear");
clear_cache = true;
timeline_items.clear();
}
VectorDiff::PushFront { value } => {
trace!("timeline_subscriber: room {room_id} diff PushFront");
if let Some((index, _ev)) = found_target_event_id.as_mut() {
*index += 1; } else {
found_target_event_id = find_target_event(&mut target_event_id, std::iter::once(&value));
}
clear_cache = true;
timeline_items.push_front(value);
}
VectorDiff::PushBack { value } => {
index_of_first_change = min(index_of_first_change, timeline_items.len());
timeline_items.push_back(value);
index_of_last_change = max(index_of_last_change, timeline_items.len());
trace!("timeline_subscriber: room {room_id} diff PushBack. Changes: {index_of_first_change}..{index_of_last_change}");
is_append = true;
}
VectorDiff::PopFront => {
trace!("timeline_subscriber: room {room_id} diff PopFront");
clear_cache = true;
timeline_items.pop_front();
if let Some((i, _ev)) = found_target_event_id.as_mut() {
*i = i.saturating_sub(1); }
}
VectorDiff::PopBack => {
timeline_items.pop_back();
index_of_first_change = min(index_of_first_change, timeline_items.len());
index_of_last_change = usize::MAX;
trace!("timeline_subscriber: room {room_id} diff PopBack. Changes: {index_of_first_change}..{index_of_last_change}");
}
VectorDiff::Insert { index, value } => {
if index == 0 {
clear_cache = true;
} else {
index_of_first_change = min(index_of_first_change, index);
index_of_last_change = usize::MAX;
}
if index >= timeline_items.len() {
is_append = true;
}
if let Some((i, _ev)) = found_target_event_id.as_mut() {
if index <= *i {
*i += 1;
}
} else {
found_target_event_id = find_target_event(&mut target_event_id, std::iter::once(&value))
.map(|(i, ev)| (i + index, ev));
}
timeline_items.insert(index, value);
trace!("timeline_subscriber: room {room_id} diff Insert at {index}. Changes: {index_of_first_change}..{index_of_last_change}");
}
VectorDiff::Set { index, value } => {
index_of_first_change = min(index_of_first_change, index);
index_of_last_change = max(index_of_last_change, index.saturating_add(1));
timeline_items.set(index, value);
trace!("timeline_subscriber: room {room_id} diff Set at {index}. Changes: {index_of_first_change}..{index_of_last_change}");
}
VectorDiff::Remove { index } => {
if index == 0 {
clear_cache = true;
} else {
index_of_first_change = min(index_of_first_change, index.saturating_sub(1));
index_of_last_change = usize::MAX;
}
if let Some((i, _ev)) = found_target_event_id.as_mut() {
if index <= *i {
*i = i.saturating_sub(1);
}
}
timeline_items.remove(index);
trace!("timeline_subscriber: room {room_id} diff Remove at {index}. Changes: {index_of_first_change}..{index_of_last_change}");
}
VectorDiff::Truncate { length } => {
if length == 0 {
clear_cache = true;
} else {
index_of_first_change = min(index_of_first_change, length.saturating_sub(1));
index_of_last_change = usize::MAX;
}
timeline_items.truncate(length);
trace!("timeline_subscriber: room {room_id} diff Truncate to length {length}. Changes: {index_of_first_change}..{index_of_last_change}");
}
VectorDiff::Reset { values } => {
trace!("timeline_subscriber: room {room_id} diff Reset, new length {}", values.len());
clear_cache = true; timeline_items = values;
}
}
}
if num_updates > 0 {
if index_of_first_change == 1 && timeline_items.front().and_then(|item| item.as_virtual()).is_some() {
index_of_first_change = 0;
clear_cache = true;
}
let changed_indices = index_of_first_change..index_of_last_change;
debug!("timeline_subscriber: applied {num_updates} updates for room {room_id}, timeline now has {} items. is_append? {is_append}, clear_cache? {clear_cache}. Changes: {changed_indices:?}.", timeline_items.len());
timeline_update_sender.send(TimelineUpdate::NewItems {
new_items: timeline_items.clone(),
clear_cache,
}).expect("Error: timeline update sender couldn't send update with new items!");
if let Some((index, found_event_id)) = found_target_event_id.take() {
target_event_id = None;
timeline_update_sender.send(
TimelineUpdate::TargetEventFound {
target_event_id: found_event_id.clone(),
index,
}
).unwrap_or_else(
|_e| panic!("Error: timeline update sender couldn't send TargetEventFound({found_event_id}, {index}) to room {room_id}!")
);
}
broadcast_event(UIUpdateMessage::RefreshUI);
}
}
else => {
break;
}
}
}
error!("Error: unexpectedly ended timeline subscriber for room {room_id}.");
}
pub async fn update_latest_event(room: &Room) {
let client = CLIENT.wait();
let (sender_username, _sender_id, timestamp, content) = match room.latest_event().await {
LatestEventValue::Remote {
timestamp,
sender,
is_own,
profile,
content,
} => {
let sender_username = if let TimelineDetails::Ready(profile) = profile {
profile.display_name
} else if is_own {
client.account().get_display_name().await.ok().flatten()
} else {
None
};
(
sender_username.unwrap_or_else(|| sender.to_string()),
sender,
timestamp,
content,
)
}
LatestEventValue::Local {
timestamp,
content,
sender,
profile,
state: _,
} => {
let our_name = match profile {
TimelineDetails::Ready(p) => p.display_name,
_ => client.account().get_display_name().await.ok().flatten(),
};
(
our_name.unwrap_or_else(|| String::from("You")),
sender,
timestamp,
content,
)
}
LatestEventValue::RemoteInvite {
timestamp,
inviter,
inviter_profile,
} => {
let sender_username = get_sender_username_from_profile(inviter_profile)
.unwrap_or_else(|| inviter.map(|i| i.to_string()).unwrap_or("".to_owned()));
enqueue_rooms_list_update(RoomsListUpdate::UpdateLatestEvent {
room_id: room.room_id().to_owned(),
timestamp,
latest_message_text: format!("New invite from {sender_username}"),
});
return;
}
LatestEventValue::None => return,
};
let latest_message_text = text_preview_of_timeline_item(&content, &sender_username)
.format_with(&sender_username, true);
enqueue_rooms_list_update(RoomsListUpdate::UpdateLatestEvent {
room_id: room.room_id().to_owned(),
timestamp,
latest_message_text,
});
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(
rename_all = "camelCase",
rename_all_fields = "camelCase",
tag = "kind"
)]
pub enum TimelineKind {
MainRoom {
room_id: OwnedRoomId,
},
Thread {
room_id: OwnedRoomId,
thread_root_event_id: OwnedEventId,
},
}
impl TimelineKind {
pub fn room_id(&self) -> &OwnedRoomId {
match self {
TimelineKind::MainRoom { room_id } => room_id,
TimelineKind::Thread { room_id, .. } => room_id,
}
}
pub fn thread_root_event_id(&self) -> Option<&OwnedEventId> {
match self {
TimelineKind::MainRoom { .. } => None,
TimelineKind::Thread {
thread_root_event_id,
..
} => Some(thread_root_event_id),
}
}
}
impl std::fmt::Display for TimelineKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TimelineKind::MainRoom { room_id } => write!(f, "MainRoom({})", room_id),
TimelineKind::Thread {
room_id,
thread_root_event_id,
} => {
write!(f, "Thread({}, {})", room_id, thread_root_event_id)
}
}
}
}
pub struct TimelineEndpoints {
pub _update_sender: crossbeam_channel::Sender<TimelineUpdate>,
pub update_receiver: crossbeam_channel::Receiver<TimelineUpdate>,
pub request_sender: TimelineRequestSender,
pub _successor_room: Option<SuccessorRoom>,
}
pub(crate) struct PerTimelineDetails {
pub timeline: Arc<Timeline>,
pub timeline_update_sender: crossbeam_channel::Sender<TimelineUpdate>,
pub timeline_singleton_endpoints: Option<(
crossbeam_channel::Receiver<TimelineUpdate>,
TimelineRequestSender,
)>,
pub timeline_subscriber_handler_task: JoinHandle<()>,
}