pub mod filters;
mod room_list;
pub mod sorters;
mod state;
use std::{sync::Arc, time::Duration};
use async_stream::stream;
use eyeball::Subscriber;
use futures_util::{Stream, StreamExt, pin_mut};
use matrix_sdk::{
Client, Error as SlidingSyncError, Room, SlidingSync, SlidingSyncList, SlidingSyncMode,
event_cache::EventCacheError, timeout::timeout,
};
pub use room_list::*;
use ruma::{
OwnedRoomId, RoomId, UInt,
api::{FeatureFlag, client::sync::sync_events::v5 as http},
assign,
events::StateEventType,
};
pub use state::*;
use thiserror::Error;
use tracing::{debug, error, warn};
const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
(StateEventType::RoomName, ""),
(StateEventType::RoomEncryption, ""),
(StateEventType::RoomMember, "$LAZY"),
(StateEventType::RoomMember, "$ME"),
(StateEventType::RoomTopic, ""),
(StateEventType::RoomAvatar, ""),
(StateEventType::RoomCanonicalAlias, ""),
(StateEventType::RoomPowerLevels, ""),
(StateEventType::CallMember, "*"),
(StateEventType::RoomJoinRules, ""),
(StateEventType::RoomTombstone, ""),
(StateEventType::RoomCreate, ""),
(StateEventType::RoomHistoryVisibility, ""),
(StateEventType::MemberHints, ""),
(StateEventType::SpaceParent, "*"),
(StateEventType::SpaceChild, "*"),
];
const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
&[(StateEventType::RoomPinnedEvents, "")];
const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
#[derive(Debug)]
pub struct RoomListService {
client: Client,
sliding_sync: Arc<SlidingSync>,
state_machine: StateMachine,
}
impl RoomListService {
pub async fn new(client: Client) -> Result<Self, Error> {
Self::new_with_share_pos(client, true).await
}
pub async fn new_with_share_pos(client: Client, share_pos: bool) -> Result<Self, Error> {
let mut builder = client
.sliding_sync("room-list")
.map_err(Error::SlidingSync)?
.with_account_data_extension(
assign!(http::request::AccountData::default(), { enabled: Some(true) }),
)
.with_receipt_extension(assign!(http::request::Receipts::default(), {
enabled: Some(true),
rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
}))
.with_typing_extension(assign!(http::request::Typing::default(), {
enabled: Some(true),
}));
if client.enabled_thread_subscriptions() {
let server_features = if let Some(cached) = client
.supported_versions_cached()
.await
.map_err(|e| Error::SlidingSync(e.into()))?
{
cached.features
} else {
client
.fetch_server_versions(None)
.await
.map_err(|e| Error::SlidingSync(e.into()))?
.as_supported_versions()
.features
};
if !server_features.contains(&FeatureFlag::from("org.matrix.msc4306")) {
warn!(
"Thread subscriptions extension is requested on the client, but the server doesn't advertise support for it: not enabling."
);
} else {
debug!("Enabling the thread subscriptions extension");
builder = builder.with_thread_subscriptions_extension(
assign!(http::request::ThreadSubscriptions::default(), {
enabled: Some(true),
limit: Some(ruma::uint!(10))
}),
);
}
}
if share_pos {
debug!("Enabling `share_pos` for the room list sliding sync");
builder = builder.share_pos();
}
let state_machine = StateMachine::new();
let observable_state = state_machine.cloned_state();
let sliding_sync = builder
.add_cached_list(
SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
.sync_mode(
SlidingSyncMode::new_selective()
.add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
)
.timeline_limit(1)
.required_state(
DEFAULT_REQUIRED_STATE
.iter()
.map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
.collect(),
)
.filters(Some(assign!(http::request::ListFilters::default(), {
is_invite: None,
})))
.requires_timeout(move |request_generator| {
match observable_state.get() {
State::Init
| State::SettingUp
| State::Recovering
| State::Error { .. }
| State::Terminated { .. } => false,
State::Running => request_generator.is_fully_loaded(),
}
}),
)
.await
.map_err(Error::SlidingSync)?
.build()
.await
.map(Arc::new)
.map_err(Error::SlidingSync)?;
client.event_cache().subscribe()?;
Ok(Self { client, sliding_sync, state_machine })
}
#[doc(hidden)]
pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
stream! {
let sync = self.sliding_sync.sync();
pin_mut!(sync);
loop {
debug!("Run a sync iteration");
let next_state = self.state_machine.next(&self.sliding_sync).await?;
match sync.next().await {
Some(Ok(_update_summary)) => {
debug!(state = ?next_state, "New state");
self.state_machine.set(next_state);
yield Ok(());
}
Some(Err(error)) => {
debug!(expected_state = ?next_state, "New state is an error");
let next_state = State::Error { from: Box::new(next_state) };
self.state_machine.set(next_state);
yield Err(Error::SlidingSync(error));
break;
}
None => {
debug!(expected_state = ?next_state, "New state is a termination");
let next_state = State::Terminated { from: Box::new(next_state) };
self.state_machine.set(next_state);
break;
}
}
}
}
}
#[doc(hidden)]
pub fn stop_sync(&self) -> Result<(), Error> {
self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
}
pub(crate) async fn expire_sync_session(&self) {
self.sliding_sync.expire_session().await;
if let State::Terminated { from } = self.state_machine.get() {
self.state_machine.set(State::Error { from });
}
}
pub fn sync_indicator(
&self,
delay_before_showing: Duration,
delay_before_hiding: Duration,
) -> impl Stream<Item = SyncIndicator> + use<> {
let mut state = self.state();
stream! {
yield SyncIndicator::Hide;
let mut current_state = state.next_now();
loop {
let (sync_indicator, yield_delay) = match current_state {
State::SettingUp | State::Error { .. } => {
(SyncIndicator::Show, delay_before_showing)
}
State::Init | State::Recovering | State::Running | State::Terminated { .. } => {
(SyncIndicator::Hide, delay_before_hiding)
}
};
let next_state = match timeout(state.next(), yield_delay).await {
Ok(next_state) => next_state,
Err(_) => {
yield sync_indicator;
state.next().await
}
};
if let Some(next_state) = next_state {
current_state = next_state;
} else {
break;
}
}
}
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn state(&self) -> Subscriber<State> {
self.state_machine.subscribe()
}
async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
}
pub async fn all_rooms(&self) -> Result<RoomList, Error> {
self.list_for(ALL_ROOMS_LIST_NAME).await
}
pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
}
pub async fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
let settings = assign!(http::request::RoomSubscription::default(), {
required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
(state_event.clone(), (*value).to_owned())
})
.chain(
DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
(state_event.clone(), (*value).to_owned())
})
)
.collect(),
timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
});
let cancel_in_flight_request = match self.state_machine.get() {
State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
false
}
State::SettingUp | State::Running => true,
};
if self.client.event_cache().has_subscribed() {
let latest_events = self.client.latest_events().await;
for room_id in room_ids {
if let Err(error) = latest_events.listen_to_room(room_id).await {
error!(?error, ?room_id, "Failed to listen to the latest event for this room");
}
}
}
self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
}
#[cfg(test)]
pub fn sliding_sync(&self) -> &SlidingSync {
&self.sliding_sync
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
SlidingSync(SlidingSyncError),
#[error("Unknown list `{0}`")]
UnknownList(String),
#[error("Room `{0}` not found")]
RoomNotFound(OwnedRoomId),
#[error(transparent)]
EventCache(#[from] EventCacheError),
}
#[derive(Debug, Eq, PartialEq)]
pub enum SyncIndicator {
Show,
Hide,
}
#[cfg(test)]
mod tests {
use std::future::ready;
use futures_util::{StreamExt, pin_mut};
use matrix_sdk::{
Client, SlidingSyncMode, config::RequestConfig, test_utils::client::mock_matrix_session,
};
use matrix_sdk_test::async_test;
use ruma::api::MatrixVersion;
use serde_json::json;
use wiremock::{Match, Mock, MockServer, Request, ResponseTemplate, http::Method};
use super::{ALL_ROOMS_LIST_NAME, Error, RoomListService, State};
async fn new_client() -> (Client, MockServer) {
let session = mock_matrix_session();
let server = MockServer::start().await;
let client = Client::builder()
.homeserver_url(server.uri())
.server_versions([MatrixVersion::V1_0])
.request_config(RequestConfig::new().disable_retry())
.build()
.await
.unwrap();
client.restore_session(session).await.unwrap();
(client, server)
}
pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
let (client, _) = new_client().await;
RoomListService::new(client).await
}
struct SlidingSyncMatcher;
impl Match for SlidingSyncMatcher {
fn matches(&self, request: &Request) -> bool {
request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
&& request.method == Method::POST
}
}
#[async_test]
async fn test_all_rooms_are_declared() -> Result<(), Error> {
let room_list = new_room_list().await?;
let sliding_sync = room_list.sliding_sync();
assert_eq!(
sliding_sync
.on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
list.sync_mode(),
SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
)))
.await,
Some(true)
);
Ok(())
}
#[async_test]
async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
let (client, server) = new_client().await;
let room_list = RoomListService::new(client).await?;
let sync = room_list.sync();
pin_mut!(sync);
{
let _mock_guard = Mock::given(SlidingSyncMatcher)
.respond_with(move |_request: &Request| {
ResponseTemplate::new(200).set_body_json(json!({
"pos": "0",
"lists": {
ALL_ROOMS_LIST_NAME: {
"count": 0,
"ops": [],
},
},
"rooms": {},
}))
})
.mount_as_scoped(&server)
.await;
let _ = sync.next().await;
}
assert_eq!(room_list.state().get(), State::SettingUp);
room_list.stop_sync()?;
let _ = sync.next().await;
assert_eq!(
room_list.state_machine.get(),
State::Terminated { from: Box::new(State::Running) }
);
room_list.expire_sync_session().await;
assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
Ok(())
}
}