use std::{future::ready, ops::Deref, sync::Arc};
use async_cell::sync::AsyncCell;
use async_rx::StreamExt as _;
use async_stream::stream;
use eyeball::{SharedObservable, Subscriber};
use eyeball_im::{Vector, VectorDiff};
use eyeball_im_util::vector::VectorObserverExt;
use futures_util::{Stream, StreamExt as _, pin_mut, stream};
use matrix_sdk::{
Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
task_monitor::BackgroundTaskHandle,
};
use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons};
use ruma::MilliSecondsSinceUnixEpoch;
use tokio::{
select,
sync::broadcast::{self, error::RecvError},
};
use tracing::{error, trace};
use super::{
Error, State,
filters::BoxedFilterFn,
sorters::{
new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name, new_sorter_recency,
},
};
#[derive(Debug)]
pub struct RoomList {
client: Client,
sliding_sync_list: SlidingSyncList,
loading_state: SharedObservable<RoomListLoadingState>,
_loading_state_task: BackgroundTaskHandle,
}
impl RoomList {
pub(super) async fn new(
client: &Client,
sliding_sync: &Arc<SlidingSync>,
sliding_sync_list_name: &str,
room_list_service_state: Subscriber<State>,
) -> Result<Self, Error> {
let sliding_sync_list = sliding_sync
.on_list(sliding_sync_list_name, |list| ready(list.clone()))
.await
.ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
let loading_state =
SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
maximum_number_of_rooms: Some(maximum_number_of_rooms),
},
None => RoomListLoadingState::NotLoaded,
});
Ok(Self {
client: client.clone(),
sliding_sync_list: sliding_sync_list.clone(),
loading_state: loading_state.clone(),
_loading_state_task: client
.task_monitor()
.spawn_infinite_task("room_list::loading_state_task", async move {
pin_mut!(room_list_service_state);
while let Some(state) = room_list_service_state.next().await {
use State::*;
match state {
Terminated { .. } | Error { .. } | Init => (),
SettingUp | Recovering | Running => break,
}
}
let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
let mut maximum_number_of_rooms_stream =
sliding_sync_list.maximum_number_of_rooms_stream();
while let Some(maximum_number_of_rooms) =
maximum_number_of_rooms_stream.next().await
{
loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
}
})
.abort_on_drop(),
})
}
pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
self.loading_state.subscribe_reset()
}
fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
self.client.rooms_stream()
}
pub fn entries_with_dynamic_adapters(
&self,
page_size: usize,
) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
{
let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
let list = self.sliding_sync_list.clone();
let filter_fn_cell = AsyncCell::shared();
let limit = SharedObservable::<usize>::new(page_size);
let limit_stream = limit.subscribe();
let dynamic_entries_controller = RoomListDynamicEntriesController::new(
filter_fn_cell.clone(),
page_size,
limit,
list.maximum_number_of_rooms_stream(),
);
let stream = stream! {
loop {
let filter_fn = filter_fn_cell.take().await;
let (raw_values, raw_stream) = self.entries();
let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
let (values, stream) = (values, stream)
.filter(filter_fn)
.sort_by(new_sorter_lexicographic(vec![
Box::new(new_sorter_latest_event()),
Box::new(new_sorter_recency()),
Box::new(new_sorter_name()),
]))
.dynamic_head_with_initial_value(page_size, limit_stream.clone());
yield stream::once(ready(vec![VectorDiff::Reset { values }]))
.chain(stream);
}
}
.fuse()
.switch();
(stream, dynamic_entries_controller)
}
}
fn merge_stream_and_receiver(
mut current_values: Vector<RoomListItem>,
raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
stream! {
pin_mut!(raw_stream);
loop {
select! {
biased;
diffs = raw_stream.next() => {
if let Some(diffs) = diffs {
let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
for diff in &diffs {
diff.clone().map(|room| {
trace!(room = %room.room_id(), "updated in response");
room
}).apply(&mut current_values);
}
yield diffs;
} else {
break;
}
}
update = room_info_notable_update_receiver.recv() => {
match update {
Ok(update) => {
if update.reasons == RoomInfoNotableUpdateReasons::RECENCY_STAMP {
continue;
}
if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
let mut room = current_values[index].clone();
room.refresh_cached_data();
yield vec![VectorDiff::Set { index, value: room }];
}
}
Err(RecvError::Closed) => {
error!("Cannot receive room info notable updates because the sender has been closed");
break;
}
Err(RecvError::Lagged(n)) => {
error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
}
}
}
}
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum RoomListLoadingState {
NotLoaded,
Loaded {
maximum_number_of_rooms: Option<u32>,
},
}
pub struct RoomListDynamicEntriesController {
filter: Arc<AsyncCell<BoxedFilterFn>>,
page_size: usize,
limit: SharedObservable<usize>,
maximum_number_of_rooms: Subscriber<Option<u32>>,
}
impl RoomListDynamicEntriesController {
fn new(
filter: Arc<AsyncCell<BoxedFilterFn>>,
page_size: usize,
limit_stream: SharedObservable<usize>,
maximum_number_of_rooms: Subscriber<Option<u32>>,
) -> Self {
Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
}
pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
if Arc::strong_count(&self.filter) == 1 {
false
} else {
self.filter.set(filter);
true
}
}
pub fn add_one_page(&self) {
let Some(max) = self.maximum_number_of_rooms.get() else {
return;
};
let max: usize = max.try_into().unwrap();
let limit = self.limit.get();
if limit < max {
self.limit.set_if_not_eq(limit + self.page_size);
}
}
pub fn reset_to_one_page(&self) {
self.limit.set_if_not_eq(self.page_size);
}
}
#[derive(Clone, Debug)]
pub struct RoomListItem {
inner: Room,
pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
pub(super) cached_latest_event_is_unsent: bool,
pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
pub(super) cached_display_name: Option<String>,
pub(super) cached_is_space: bool,
pub(super) cached_state: RoomState,
}
impl RoomListItem {
pub fn into_inner(self) -> Room {
self.inner
}
pub(super) fn refresh_cached_data(&mut self) {
self.cached_latest_event_timestamp = self.inner.latest_event_timestamp();
self.cached_latest_event_is_unsent = self.inner.latest_event_is_unsent();
self.cached_recency_stamp = self.inner.recency_stamp();
self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
self.cached_is_space = self.inner.is_space();
self.cached_state = self.inner.state();
}
}
impl From<Room> for RoomListItem {
fn from(inner: Room) -> Self {
let cached_latest_event_timestamp = inner.latest_event_timestamp();
let cached_latest_event_is_unsent = inner.latest_event_is_unsent();
let cached_recency_stamp = inner.recency_stamp();
let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
let cached_is_space = inner.is_space();
let cached_state = inner.state();
Self {
inner,
cached_latest_event_timestamp,
cached_latest_event_is_unsent,
cached_recency_stamp,
cached_display_name,
cached_is_space,
cached_state,
}
}
}
impl Deref for RoomListItem {
type Target = Room;
fn deref(&self) -> &Self::Target {
&self.inner
}
}