pub mod filters;
mod room_list;
pub mod sorters;
mod state;
use std::{sync::Arc, time::Duration};
use async_stream::stream;
use eyeball::Subscriber;
use futures_util::{Stream, StreamExt, pin_mut};
use matrix_sdk::{
Client, Error as SlidingSyncError, Room, SlidingSync, SlidingSyncList, SlidingSyncMode,
event_cache::EventCacheError, sliding_sync::PollTimeout, timeout::timeout,
};
pub use room_list::*;
use ruma::{
OwnedRoomId, RoomId, UInt, api::client::sync::sync_events::v5 as http, assign,
events::StateEventType,
};
pub use state::*;
use thiserror::Error;
use tracing::{debug, error, warn};
const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
(StateEventType::RoomName, ""),
(StateEventType::RoomEncryption, ""),
(StateEventType::RoomMember, "$LAZY"),
(StateEventType::RoomMember, "$ME"),
(StateEventType::RoomTopic, ""),
(StateEventType::RoomAvatar, ""),
(StateEventType::RoomCanonicalAlias, ""),
(StateEventType::RoomPowerLevels, ""),
(StateEventType::CallMember, "*"),
(StateEventType::RoomJoinRules, ""),
(StateEventType::RoomTombstone, ""),
(StateEventType::RoomCreate, ""),
(StateEventType::RoomHistoryVisibility, ""),
(StateEventType::MemberHints, ""),
(StateEventType::SpaceParent, "*"),
(StateEventType::SpaceChild, "*"),
(StateEventType::BeaconInfo, "*"),
];
const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
&[(StateEventType::RoomPinnedEvents, "")];
pub(crate) const DEFAULT_CONNECTION_ID: &str = "room-list";
pub(crate) const DEFAULT_LIST_TIMELINE_LIMIT: u32 = 1;
const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
#[derive(Debug)]
pub struct RoomListService {
client: Client,
sliding_sync: Arc<SlidingSync>,
state_machine: StateMachine,
}
impl RoomListService {
pub async fn new(client: Client) -> Result<Self, Error> {
Self::new_with(client, true, DEFAULT_CONNECTION_ID, DEFAULT_LIST_TIMELINE_LIMIT).await
}
pub async fn new_with(
client: Client,
share_pos: bool,
connection_id: &str,
timeline_limit: u32,
) -> Result<Self, Error> {
let mut builder = client
.sliding_sync(connection_id)
.map_err(Error::SlidingSync)?
.with_account_data_extension(
assign!(http::request::AccountData::default(), { enabled: Some(true) }),
)
.with_receipt_extension(assign!(http::request::Receipts::default(), {
enabled: Some(true),
rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
}))
.with_typing_extension(assign!(http::request::Typing::default(), {
enabled: Some(true),
}));
match client.enabled_thread_subscriptions().await {
Ok(true) => {
debug!("Client requested thread subscriptions extension");
builder = builder.with_thread_subscriptions_extension(
assign!(http::request::ThreadSubscriptions::default(), {
enabled: Some(true),
limit: Some(ruma::uint!(10))
}),
);
}
Ok(false) => {
debug!(
"Thread subscriptions extension either not requested on the client, or the server doesn't advertise support for it: not enabling."
);
}
Err(error) => {
warn!(
?error,
"Failed to check whether the client requested thread subscriptions extension: not enabling."
);
}
}
if share_pos {
debug!("Enabling `share_pos` for the room list sliding sync");
builder = builder.share_pos();
}
let state_machine = StateMachine::new();
let observable_state = state_machine.cloned_state();
let sliding_sync = builder
.add_cached_list(
SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
.sync_mode(
SlidingSyncMode::new_selective()
.add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
)
.timeline_limit(timeline_limit)
.required_state(
DEFAULT_REQUIRED_STATE
.iter()
.map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
.collect(),
)
.filters(Some(assign!(http::request::ListFilters::default(), {
is_invite: None,
})))
.requires_timeout(move |request_generator| {
match observable_state.get() {
State::Init
| State::SettingUp
| State::Recovering
| State::Error { .. }
| State::Terminated { .. } => PollTimeout::Some(0),
State::Running => {
if request_generator.is_fully_loaded() {
PollTimeout::Default
} else {
PollTimeout::Some(0)
}
}
}
}),
)
.await
.map_err(Error::SlidingSync)?
.build()
.await
.map(Arc::new)
.map_err(Error::SlidingSync)?;
client.event_cache().subscribe()?;
Ok(Self { client, sliding_sync, state_machine })
}
#[doc(hidden)]
pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
stream! {
let sync = self.sliding_sync.sync();
pin_mut!(sync);
loop {
debug!("Run a sync iteration");
let next_state = self.state_machine.next(&self.sliding_sync).await?;
match sync.next().await {
Some(Ok(_update_summary)) => {
debug!(state = ?next_state, "New state");
self.state_machine.set(next_state);
yield Ok(());
}
Some(Err(error)) => {
debug!(expected_state = ?next_state, "New state is an error");
let next_state = State::Error { from: Box::new(next_state) };
self.state_machine.set(next_state);
yield Err(Error::SlidingSync(error));
break;
}
None => {
debug!(expected_state = ?next_state, "New state is a termination");
let next_state = State::Terminated { from: Box::new(next_state) };
self.state_machine.set(next_state);
break;
}
}
}
}
}
#[doc(hidden)]
pub fn stop_sync(&self) -> Result<(), Error> {
self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
}
pub(crate) async fn expire_sync_session(&self) {
self.sliding_sync.expire_session().await;
if let State::Terminated { from } = self.state_machine.get() {
self.state_machine.set(State::Error { from });
}
}
pub fn sync_indicator(
&self,
delay_before_showing: Duration,
delay_before_hiding: Duration,
) -> impl Stream<Item = SyncIndicator> + use<> {
let mut state = self.state();
stream! {
yield SyncIndicator::Hide;
let mut current_state = state.next_now();
loop {
let (sync_indicator, yield_delay) = match current_state {
State::SettingUp | State::Error { .. } => {
(SyncIndicator::Show, delay_before_showing)
}
State::Init | State::Recovering | State::Running | State::Terminated { .. } => {
(SyncIndicator::Hide, delay_before_hiding)
}
};
let next_state = match timeout(state.next(), yield_delay).await {
Ok(next_state) => next_state,
Err(_) => {
yield sync_indicator;
state.next().await
}
};
if let Some(next_state) = next_state {
current_state = next_state;
} else {
break;
}
}
}
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn state(&self) -> Subscriber<State> {
self.state_machine.subscribe()
}
async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
}
pub async fn all_rooms(&self) -> Result<RoomList, Error> {
self.list_for(ALL_ROOMS_LIST_NAME).await
}
pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
}
pub async fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
let settings = assign!(http::request::RoomSubscription::default(), {
required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
(state_event.clone(), (*value).to_owned())
})
.chain(
DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
(state_event.clone(), (*value).to_owned())
})
)
.collect(),
timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
});
let cancel_in_flight_request = match self.state_machine.get() {
State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
false
}
State::SettingUp | State::Running => true,
};
if self.client.event_cache().has_subscribed() {
let latest_events = self.client.latest_events().await;
for room_id in room_ids {
if let Err(error) = latest_events.listen_to_room(room_id).await {
error!(?error, ?room_id, "Failed to listen to the latest event for this room");
}
}
}
self.sliding_sync.clear_and_subscribe_to_rooms(
room_ids,
Some(settings),
cancel_in_flight_request,
)
}
#[cfg(test)]
pub fn sliding_sync(&self) -> &SlidingSync {
&self.sliding_sync
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
SlidingSync(SlidingSyncError),
#[error("Unknown list `{0}`")]
UnknownList(String),
#[error("Room `{0}` not found")]
RoomNotFound(OwnedRoomId),
#[error(transparent)]
EventCache(#[from] EventCacheError),
}
#[derive(Debug, Eq, PartialEq)]
pub enum SyncIndicator {
Show,
Hide,
}
#[cfg(test)]
mod tests {
use std::future::ready;
use futures_util::{StreamExt, pin_mut};
use matrix_sdk::{SlidingSyncMode, test_utils::mocks::MatrixMockServer};
use matrix_sdk_test::{TestError, async_test};
use ruma::{api::client::sync::sync_events::v5, assign, uint};
use super::{ALL_ROOMS_LIST_NAME, Error, RoomListService, State};
#[async_test]
async fn test_all_rooms_are_declared() -> Result<(), TestError> {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_list = RoomListService::new(client).await?;
let sliding_sync = room_list.sliding_sync();
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
)))
.await,
Some(true)
);
Ok(())
}
#[async_test]
async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let room_list = RoomListService::new(client).await?;
let sync = room_list.sync();
pin_mut!(sync);
{
let _mock_guard = server
.mock_sliding_sync()
.ok({
let mut response = v5::Response::new("0".to_owned());
response.lists.insert(
ALL_ROOMS_LIST_NAME.to_owned(),
assign!(v5::response::List::default(), { count: uint!(0) }),
);
response
})
.mount_as_scoped()
.await;
let _ = sync.next().await;
}
assert_eq!(room_list.state().get(), State::SettingUp);
room_list.stop_sync()?;
let _ = sync.next().await;
assert_eq!(
room_list.state_machine.get(),
State::Terminated { from: Box::new(State::Running) }
);
room_list.expire_sync_session().await;
assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
Ok(())
}
}