use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::Deref,
sync::{Arc, Condvar, Mutex},
};
use crate::{
events::{
handlers::get_latest_event_details,
timeline::{
PerTimelineDetails, TimelineKind, TimelineUpdate, timeline_subscriber_handler,
update_latest_event,
},
},
init::singletons::{UIUpdateMessage, broadcast_event},
room::{
invited_room::{InvitedRoomInfo, InviterInfo},
rooms_list::{JoinedRoomInfo, RoomsListUpdate, enqueue_rooms_list_update},
},
user::user_power_level::UserPowerLevels,
};
use matrix_sdk::{
RoomDisplayName, RoomHero, RoomState,
event_handler::EventHandlerDropGuard,
ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedUserId,
events::tag::Tags,
},
};
use matrix_sdk_ui::{
RoomListService, Timeline,
timeline::{RoomExt, TimelineFocus},
};
use tokio::{runtime::Handle, sync::watch};
use tracing::{debug, error, info, warn};
pub struct JoinedRoomDetails {
#[allow(unused)]
room_id: OwnedRoomId,
pub(crate) main_timeline: PerTimelineDetails,
pub(crate) thread_timelines: HashMap<OwnedEventId, PerTimelineDetails>,
pub(crate) pending_thread_timelines: HashSet<OwnedEventId>,
pub(crate) typing_notice_subscriber: Option<EventHandlerDropGuard>,
pinned_events_subscriber: Option<EventHandlerDropGuard>,
}
impl Drop for JoinedRoomDetails {
fn drop(&mut self) {
debug!("Dropping JoinedRoomDetails for room {}", self.room_id);
self.main_timeline.timeline_subscriber_handler_task.abort();
for thread_timeline in self.thread_timelines.values() {
thread_timeline.timeline_subscriber_handler_task.abort();
}
drop(self.typing_notice_subscriber.take());
drop(self.pinned_events_subscriber.take());
}
}
#[derive(Clone)]
pub struct RoomListServiceRoomInfo {
pub room_id: OwnedRoomId,
state: RoomState,
is_direct: bool,
is_marked_unread: bool,
is_tombstoned: bool,
tags: Option<Tags>,
topic: Option<String>,
user_power_levels: Option<UserPowerLevels>,
latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
num_unread_messages: u64,
num_unread_mentions: u64,
display_name: Option<RoomDisplayName>,
room_avatar: Option<OwnedMxcUri>,
heroes: Vec<RoomHero>,
room: matrix_sdk::Room,
}
impl RoomListServiceRoomInfo {
pub(crate) async fn from_room(
room: matrix_sdk::Room,
current_user_id: &Option<OwnedUserId>,
) -> Self {
let (is_direct, tags, display_name, user_power_levels) =
tokio::join!(room.is_direct(), room.tags(), room.display_name(), async {
if let Some(user_id) = current_user_id {
UserPowerLevels::from_room(&room, user_id.deref()).await
} else {
None
}
});
Self {
room_id: room.room_id().to_owned(),
state: room.state(),
is_direct: is_direct.unwrap_or(false),
is_marked_unread: room.is_marked_unread(),
is_tombstoned: room.is_tombstoned(),
tags: tags.ok().flatten(),
topic: room.topic(),
user_power_levels,
latest_event_timestamp: room.latest_event_timestamp(),
num_unread_messages: room.num_unread_messages(),
num_unread_mentions: room.num_unread_mentions(),
display_name: display_name.ok(),
room_avatar: room.avatar_url(),
heroes: room.heroes(),
room,
}
}
pub(crate) async fn from_room_ref(
room: &matrix_sdk::Room,
current_user_id: &Option<OwnedUserId>,
) -> Self {
Self::from_room(room.clone(), current_user_id).await
}
}
#[derive(Clone, Debug)]
pub enum UnreadMessageCount {
_Unknown,
Known(u64),
}
pub async fn add_new_room(
new_room: &RoomListServiceRoomInfo,
room_list_service: &RoomListService,
subscribe: bool,
) -> anyhow::Result<()> {
let direct_user_id_option = if new_room.is_direct && new_room.room.direct_targets_length() == 1
{
new_room
.room
.direct_targets()
.iter()
.next()
.map(|id| id.to_owned())
} else {
None
};
match new_room.state {
RoomState::Knocked => {
return Ok(());
}
RoomState::Banned => {
info!("Got new Banned room: ({})", new_room.room_id);
return Ok(());
}
RoomState::Left => {
info!("Got new Left room: ({:?})", new_room.room_id);
return Ok(());
}
RoomState::Invited => {
let invite_details = new_room.room.invite_details().await.ok();
let latest_event = new_room.room.latest_event().await;
let latest = get_latest_event_details(latest_event);
let inviter_info = invite_details.and_then(|d| {
d.inviter.map(|i| InviterInfo {
user_id: i.user_id().to_owned(),
display_name: i.display_name().map(|n| n.to_owned()),
avatar: i.avatar_url().map(|a| a.to_owned()),
})
});
enqueue_rooms_list_update(RoomsListUpdate::AddInvitedRoom(InvitedRoomInfo {
room_id: new_room.room_id.clone(),
room_name: new_room
.display_name
.clone()
.unwrap_or(RoomDisplayName::Empty)
.into(),
inviter_info,
room_avatar: new_room.room_avatar.clone(),
canonical_alias: new_room.room.canonical_alias(),
alt_aliases: new_room.room.alt_aliases(),
latest,
invite_state: Default::default(),
is_direct: new_room.is_direct,
}));
return Ok(());
}
RoomState::Joined => {} }
if subscribe {
room_list_service
.subscribe_to_rooms(&[&new_room.room_id])
.await;
}
let timeline = Arc::new(
new_room
.room
.timeline_builder()
.with_focus(TimelineFocus::Live {
hide_threaded_events: true,
})
.track_read_marker_and_receipts(
matrix_sdk_ui::timeline::TimelineReadReceiptTracking::AllEvents,
)
.build()
.await
.map_err(|e| {
anyhow::anyhow!(
"BUG: Failed to build timeline for room {}: {e}",
new_room.room_id
)
})?,
);
let (timeline_update_sender, timeline_update_receiver) = crossbeam_channel::unbounded();
let (request_sender, request_receiver) = watch::channel(Vec::new());
let timeline_subscriber_handler_task = Handle::current().spawn(timeline_subscriber_handler(
new_room.room.clone(),
timeline.clone(),
timeline_update_sender.clone(),
request_receiver,
None,
));
let latest_event = new_room.room.latest_event().await;
let latest = get_latest_event_details(latest_event);
info!(
"Adding new joined room {}, name: {:?}",
new_room.room_id,
new_room.room.name()
);
insert_room_details(
new_room.room_id.clone(),
JoinedRoomDetails {
room_id: new_room.room_id.clone(),
main_timeline: PerTimelineDetails {
timeline,
timeline_singleton_endpoints: Some((timeline_update_receiver, request_sender)),
timeline_update_sender,
timeline_subscriber_handler_task,
},
thread_timelines: HashMap::new(),
pending_thread_timelines: HashSet::new(),
typing_notice_subscriber: None,
pinned_events_subscriber: None,
},
);
enqueue_rooms_list_update(RoomsListUpdate::AddJoinedRoom(JoinedRoomInfo {
room_id: new_room.room_id.clone(),
latest,
tags: new_room.tags.clone().unwrap_or_default(),
topic: new_room.topic.clone(),
num_unread_messages: new_room.num_unread_messages,
num_unread_mentions: new_room.num_unread_mentions,
is_marked_unread: new_room.is_marked_unread,
avatar: new_room.room_avatar.clone(),
room_name: new_room
.display_name
.clone()
.unwrap_or(RoomDisplayName::Empty)
.into(),
canonical_alias: new_room.room.canonical_alias(),
alt_aliases: new_room.room.alt_aliases(),
has_been_paginated: false,
is_selected: false,
is_direct: new_room.is_direct,
is_tombstoned: new_room.is_tombstoned,
direct_user_id: direct_user_id_option.and_then(|id| id.into_user_id()),
heroes: new_room.heroes.clone(),
}));
Ok(())
}
pub async fn update_room(
old_room: &RoomListServiceRoomInfo,
new_room: &RoomListServiceRoomInfo,
room_list_service: &RoomListService,
) -> anyhow::Result<()> {
let new_room_id = new_room.room_id.clone();
if old_room.room_id == new_room_id {
info!(
"Room {:?} ({new_room_id}) state went from {:?} --> {:?}",
new_room.display_name, old_room.state, new_room.state
);
if old_room.state != new_room.state {
match new_room.state {
RoomState::Banned => {
debug!(
"Removing Banned room: {:?} ({new_room_id})",
new_room.display_name
);
remove_room(new_room);
return Ok(());
}
RoomState::Left => {
debug!(
"Removing Left room: {:?} ({new_room_id})",
new_room.display_name
);
remove_room(new_room);
return Ok(());
}
RoomState::Joined => {
debug!(
"update_room(): adding new Joined room: {:?} ({new_room_id})",
new_room.display_name
);
return add_new_room(new_room, room_list_service, true).await;
}
RoomState::Invited => {
debug!(
"update_room(): adding new Invited room: {:?} ({new_room_id})",
new_room.display_name
);
return add_new_room(new_room, room_list_service, true).await;
}
RoomState::Knocked => {
return Ok(());
}
}
}
if old_room.room_avatar != new_room.room_avatar
&& let Some(ref avatar) = new_room.room_avatar
{
debug!("Updating room avatar for room {}", new_room_id);
enqueue_rooms_list_update(RoomsListUpdate::UpdateRoomAvatar {
room_id: new_room_id.clone(),
avatar: avatar.to_owned(),
});
}
if old_room.display_name != new_room.display_name
&& let Some(ref new_room_name) = new_room.display_name
{
debug!(
"Updating room {} name: {:?} --> {:?}",
new_room_id, old_room.display_name, new_room_name
);
enqueue_rooms_list_update(RoomsListUpdate::UpdateRoomName {
room_id: new_room_id.clone(),
new_room_name: new_room_name.to_owned(),
});
}
if old_room.topic != new_room.topic
&& let Some(ref new_topic) = new_room.topic
{
debug!(
"Updating room {} topic: {:?} --> {:?}",
new_room_id, old_room.topic, new_topic
);
enqueue_rooms_list_update(RoomsListUpdate::UpdateTopic {
room_id: new_room_id.clone(),
new_topic: new_topic.to_owned(),
});
}
if matches!(new_room.state, RoomState::Joined) {
let update_latest = match (
old_room.latest_event_timestamp,
new_room.room.latest_event_timestamp(),
) {
(Some(old_ts), Some(new_ts)) => new_ts >= old_ts,
(None, Some(_)) => true,
_ => false,
};
if update_latest {
update_latest_event(&new_room.room).await;
}
if old_room.tags != new_room.tags {
debug!(
"Updating room {} tags from {:?} to {:?}",
new_room_id, old_room.tags, new_room.tags
);
enqueue_rooms_list_update(RoomsListUpdate::Tags {
room_id: new_room_id.clone(),
new_tags: new_room.tags.clone().unwrap_or_default(),
});
}
if old_room.is_marked_unread != new_room.is_marked_unread
|| old_room.num_unread_messages != new_room.num_unread_messages
|| old_room.num_unread_mentions != new_room.num_unread_mentions
{
debug!(
"Updating room {}, unread messages {} --> {}, unread mentions {} --> {}",
new_room_id,
old_room.num_unread_messages,
new_room.num_unread_messages,
old_room.num_unread_mentions,
new_room.num_unread_mentions,
);
enqueue_rooms_list_update(RoomsListUpdate::UpdateNumUnreadMessages {
room_id: new_room_id.clone(),
is_marked_unread: new_room.is_marked_unread,
unread_messages: UnreadMessageCount::Known(new_room.num_unread_messages),
unread_mentions: new_room.num_unread_mentions,
});
}
if old_room.is_direct != new_room.is_direct {
debug!(
"Updating room {} is_direct from {} to {}",
new_room_id, old_room.is_direct, new_room.is_direct,
);
enqueue_rooms_list_update(RoomsListUpdate::UpdateIsDirect {
room_id: new_room_id.clone(),
is_direct: new_room.is_direct,
});
}
let mut __timeline_update_sender_opt = None;
let mut get_timeline_update_sender = |room_id| {
if __timeline_update_sender_opt.is_none()
&& let Some(jrd) = try_get_room_details(room_id)
{
__timeline_update_sender_opt = Some(
jrd.lock()
.unwrap()
.main_timeline
.timeline_update_sender
.clone(),
);
}
__timeline_update_sender_opt.clone()
};
if !old_room.is_tombstoned && new_room.is_tombstoned {
let successor_room = new_room.room.successor_room();
debug!("Updating room {new_room_id} to be tombstoned, {successor_room:?}");
enqueue_rooms_list_update(RoomsListUpdate::TombstonedRoom {
room_id: new_room_id.clone(),
});
if get_timeline_update_sender(&new_room_id).is_some() {
} else {
error!(
"BUG: could not find JoinedRoomDetails for newly-tombstoned room {new_room_id}"
);
}
}
if let Some(nupl) = new_room.user_power_levels
&& old_room.user_power_levels.is_none_or(|oupl| oupl != nupl)
{
if let Some(timeline_update_sender) = get_timeline_update_sender(&new_room_id) {
debug!("Updating room {new_room_id} user power levels.");
match timeline_update_sender.send(TimelineUpdate::UserPowerLevels(nupl)) {
Ok(_) => {
broadcast_event(UIUpdateMessage::RefreshUI);
}
Err(_) => {
warn!("Failed to send the UserPowerLevels update to room {new_room_id}")
}
}
} else {
error!(
"BUG: could not find JoinedRoomDetails for room {new_room_id} where power levels changed."
);
}
}
}
Ok(())
} else {
debug!(
"UNTESTED SCENARIO: update_room(): removing old room {}, replacing with new room {}",
old_room.room_id, new_room_id,
);
remove_room(old_room);
add_new_room(new_room, room_list_service, true).await
}
}
pub fn remove_room(room: &RoomListServiceRoomInfo) {
remove_room_details(&room.room_id);
enqueue_rooms_list_update(RoomsListUpdate::RemoveRoom {
room_id: room.room_id.clone(),
_new_state: room.state,
});
}
type JoinedRoomsMap = BTreeMap<OwnedRoomId, Arc<Mutex<JoinedRoomDetails>>>;
pub static ALL_JOINED_ROOMS: (Mutex<JoinedRoomsMap>, Condvar) =
(Mutex::new(BTreeMap::new()), Condvar::new());
pub fn _wait_for_room_details(room_id: &OwnedRoomId) -> Arc<Mutex<JoinedRoomDetails>> {
let (lock, cvar) = &ALL_JOINED_ROOMS;
let mut map_guard = lock.lock().unwrap();
while map_guard.get(room_id).is_none() {
map_guard = cvar.wait(map_guard).unwrap();
}
map_guard.get(room_id).unwrap().clone()
}
pub fn insert_room_details(room_id: OwnedRoomId, details: JoinedRoomDetails) {
let (lock, cvar) = &ALL_JOINED_ROOMS;
let details_arc_mutex = Arc::new(Mutex::new(details));
let mut map_guard = lock.lock().unwrap();
map_guard.insert(room_id, details_arc_mutex);
cvar.notify_all();
}
pub fn remove_room_details(room_id: &OwnedRoomId) -> Option<Arc<Mutex<JoinedRoomDetails>>> {
let (lock, cvar) = &ALL_JOINED_ROOMS;
let mut map_guard = lock.lock().unwrap();
let removed_details = map_guard.remove(room_id);
cvar.notify_all();
removed_details
}
pub fn try_get_room_details(room_id: &OwnedRoomId) -> Option<Arc<Mutex<JoinedRoomDetails>>> {
let (lock, _cvar) = &ALL_JOINED_ROOMS;
let map_guard = lock.lock().unwrap();
map_guard.get(room_id).cloned()
}
pub fn clear_all_rooms() {
let (lock, cvar) = &ALL_JOINED_ROOMS;
let mut map_guard = lock.lock().unwrap();
map_guard.clear();
cvar.notify_all();
}
fn with_per_timeline_details<F, R>(kind: &TimelineKind, f: F) -> Option<R>
where
F: FnOnce(&mut PerTimelineDetails) -> R,
{
let lock = crate::room::joined_room::try_get_room_details(kind.room_id())?;
let mut room_info = lock.lock().unwrap();
let details = match kind {
TimelineKind::MainRoom { .. } => Some(&mut room_info.main_timeline),
TimelineKind::Thread {
thread_root_event_id,
..
} => room_info.thread_timelines.get_mut(thread_root_event_id),
}?;
Some(f(details))
}
pub(crate) fn get_timeline_and_sender(
kind: &TimelineKind,
) -> Option<(Arc<Timeline>, crossbeam_channel::Sender<TimelineUpdate>)> {
with_per_timeline_details(kind, |details| {
(
details.timeline.clone(),
details.timeline_update_sender.clone(),
)
})
}
pub(crate) fn get_timeline(kind: &TimelineKind) -> Option<Arc<Timeline>> {
with_per_timeline_details(kind, |details| details.timeline.clone())
}