matrix_sdk_ui/room_list_service/mod.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//! list. Its goal is to load all the user' rooms. It starts with a
37//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//! set of rooms) to load the first rooms quickly, and then updates to a
39//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//! background”: it will sync the existing rooms and will fetch new rooms, by
41//! a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room_list;
56pub mod sorters;
57mod state;
58
59use std::{sync::Arc, time::Duration};
60
61use async_stream::stream;
62use eyeball::Subscriber;
63use futures_util::{pin_mut, Stream, StreamExt};
64use matrix_sdk::{
65 event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, Room,
66 SlidingSync, SlidingSyncList, SlidingSyncMode,
67};
68pub use room_list::*;
69use ruma::{
70 api::client::sync::sync_events::v5 as http, assign, directory::RoomTypeFilter,
71 events::StateEventType, OwnedRoomId, RoomId, UInt,
72};
73pub use state::*;
74use thiserror::Error;
75use tracing::debug;
76
77/// The default `required_state` constant value for sliding sync lists and
78/// sliding sync room subscriptions.
79const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
80 (StateEventType::RoomName, ""),
81 (StateEventType::RoomEncryption, ""),
82 (StateEventType::RoomMember, "$LAZY"),
83 (StateEventType::RoomMember, "$ME"),
84 (StateEventType::RoomTopic, ""),
85 (StateEventType::RoomCanonicalAlias, ""),
86 (StateEventType::RoomPowerLevels, ""),
87 (StateEventType::CallMember, "*"),
88 (StateEventType::RoomJoinRules, ""),
89 (StateEventType::RoomTombstone, ""),
90 // Those two events are required to properly compute room previews.
91 // `StateEventType::RoomCreate` is also necessary to compute the room
92 // version, and thus handling the tombstoned room correctly.
93 (StateEventType::RoomCreate, ""),
94 (StateEventType::RoomHistoryVisibility, ""),
95 // Required to correctly calculate the room display name.
96 (StateEventType::MemberHints, ""),
97];
98
99/// The default `required_state` constant value for sliding sync room
100/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
101const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
102 &[(StateEventType::RoomPinnedEvents, "")];
103
104/// The default `timeline_limit` value when used with room subscriptions.
105const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
106
107/// The [`RoomListService`] type. See the module's documentation to learn more.
108#[derive(Debug)]
109pub struct RoomListService {
110 /// Client that has created this [`RoomListService`].
111 client: Client,
112
113 /// The Sliding Sync instance.
114 sliding_sync: Arc<SlidingSync>,
115
116 /// The current state of the `RoomListService`.
117 ///
118 /// `RoomListService` is a simple state-machine.
119 state_machine: StateMachine,
120}
121
122impl RoomListService {
123 /// Create a new `RoomList`.
124 ///
125 /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
126 /// already pre-configured.
127 ///
128 /// This won't start an encryption sync, and it's the user's responsibility
129 /// to create one in this case using
130 /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
131 pub async fn new(client: Client) -> Result<Self, Error> {
132 let builder = client
133 .sliding_sync("room-list")
134 .map_err(Error::SlidingSync)?
135 .with_account_data_extension(
136 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
137 )
138 .with_receipt_extension(assign!(http::request::Receipts::default(), {
139 enabled: Some(true),
140 rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
141 }))
142 .with_typing_extension(assign!(http::request::Typing::default(), {
143 enabled: Some(true),
144 }));
145 // TODO: Re-enable once we know it creates slowness.
146 // // We don't deal with encryption device messages here so this is safe
147 // .share_pos();
148
149 let sliding_sync = builder
150 .add_cached_list(
151 SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
152 .sync_mode(
153 SlidingSyncMode::new_selective()
154 .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
155 )
156 .timeline_limit(1)
157 .required_state(
158 DEFAULT_REQUIRED_STATE
159 .iter()
160 .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
161 .collect(),
162 )
163 .filters(Some(assign!(http::request::ListFilters::default(), {
164 // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
165 // If unset, both invited and joined rooms are returned. If false, no invited rooms are
166 // returned. If true, only invited rooms are returned.
167 is_invite: None,
168 not_room_types: vec![RoomTypeFilter::Space],
169 }))),
170 )
171 .await
172 .map_err(Error::SlidingSync)?
173 .build()
174 .await
175 .map(Arc::new)
176 .map_err(Error::SlidingSync)?;
177
178 // Eagerly subscribe the event cache to sync responses.
179 client.event_cache().subscribe()?;
180
181 Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
182 }
183
184 /// Start to sync the room list.
185 ///
186 /// It's the main method of this entire API. Calling `sync` allows to
187 /// receive updates on the room list: new rooms, rooms updates etc. Those
188 /// updates can be read with [`RoomList::entries`] for example. This method
189 /// returns a [`Stream`] where produced items only hold an empty value
190 /// in case of a sync success, otherwise an error.
191 ///
192 /// The `RoomListService`' state machine is run by this method.
193 ///
194 /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
195 /// calling [`Self::sync`] again will resume from the previous state of
196 /// the state machine.
197 ///
198 /// This should be used only for testing. In practice, most users should be
199 /// using the [`SyncService`] instead.
200 #[doc(hidden)]
201 pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
202 stream! {
203 let sync = self.sliding_sync.sync();
204 pin_mut!(sync);
205
206 // This is a state machine implementation.
207 // Things happen in this order:
208 //
209 // 1. The next state is calculated,
210 // 2. The actions associated to the next state are run,
211 // 3. A sync is done,
212 // 4. The next state is stored.
213 loop {
214 debug!("Run a sync iteration");
215
216 // Calculate the next state, and run the associated actions.
217 let next_state = self.state_machine.next(&self.sliding_sync).await?;
218
219 // Do the sync.
220 match sync.next().await {
221 // Got a successful result while syncing.
222 Some(Ok(_update_summary)) => {
223 debug!(state = ?next_state, "New state");
224
225 // Update the state.
226 self.state_machine.set(next_state);
227
228 yield Ok(());
229 }
230
231 // Got an error while syncing.
232 Some(Err(error)) => {
233 debug!(expected_state = ?next_state, "New state is an error");
234
235 let next_state = State::Error { from: Box::new(next_state) };
236 self.state_machine.set(next_state);
237
238 yield Err(Error::SlidingSync(error));
239
240 break;
241 }
242
243 // Sync loop has terminated.
244 None => {
245 debug!(expected_state = ?next_state, "New state is a termination");
246
247 let next_state = State::Terminated { from: Box::new(next_state) };
248 self.state_machine.set(next_state);
249
250 break;
251 }
252 }
253 }
254 }
255 }
256
257 /// Force to stop the sync of the `RoomListService` started by
258 /// [`Self::sync`].
259 ///
260 /// It's of utter importance to call this method rather than stop polling
261 /// the `Stream` returned by [`Self::sync`] because it will force the
262 /// cancellation and exit the sync loop, i.e. it will cancel any
263 /// in-flight HTTP requests, cancel any pending futures etc. and put the
264 /// service into a termination state.
265 ///
266 /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
267 /// until it returns `None`, because of [`Self::stop_sync`], so that it
268 /// ensures the states are correctly placed.
269 ///
270 /// Stopping the sync of the room list via this method will put the
271 /// state-machine into the [`State::Terminated`] state.
272 ///
273 /// This should be used only for testing. In practice, most users should be
274 /// using the [`SyncService`] instead.
275 #[doc(hidden)]
276 pub fn stop_sync(&self) -> Result<(), Error> {
277 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
278 }
279
280 /// Force the sliding sync session to expire.
281 ///
282 /// This is used by [`SyncService`][crate::SyncService].
283 ///
284 /// **Warning**: This method **must not** be called while the sync loop is
285 /// running!
286 pub(crate) async fn expire_sync_session(&self) {
287 self.sliding_sync.expire_session().await;
288
289 // Usually, when the session expires, it leads the state to be `Error`,
290 // thus some actions (like refreshing the lists) are executed. However,
291 // if the sync loop has been stopped manually, the state is `Terminated`, and
292 // when the session is forced to expire, the state remains `Terminated`, thus
293 // the actions aren't executed as expected. Consequently, let's update the
294 // state.
295 if let State::Terminated { from } = self.state_machine.get() {
296 self.state_machine.set(State::Error { from });
297 }
298 }
299
300 /// Get a [`Stream`] of [`SyncIndicator`].
301 ///
302 /// Read the documentation of [`SyncIndicator`] to learn more about it.
303 pub fn sync_indicator(
304 &self,
305 delay_before_showing: Duration,
306 delay_before_hiding: Duration,
307 ) -> impl Stream<Item = SyncIndicator> {
308 let mut state = self.state();
309
310 stream! {
311 // Ensure the `SyncIndicator` is always hidden to start with.
312 yield SyncIndicator::Hide;
313
314 // Let's not wait for an update to happen. The `SyncIndicator` must be
315 // computed as fast as possible.
316 let mut current_state = state.next_now();
317
318 loop {
319 let (sync_indicator, yield_delay) = match current_state {
320 State::Init | State::Error { .. } => {
321 (SyncIndicator::Show, delay_before_showing)
322 }
323
324 State::SettingUp | State::Recovering | State::Running | State::Terminated { .. } => {
325 (SyncIndicator::Hide, delay_before_hiding)
326 }
327 };
328
329 // `state.next().await` has a maximum of `yield_delay` time to execute…
330 let next_state = match timeout(state.next(), yield_delay).await {
331 // A new state has been received before `yield_delay` time. The new
332 // `sync_indicator` value won't be yielded.
333 Ok(next_state) => next_state,
334
335 // No new state has been received before `yield_delay` time. The
336 // `sync_indicator` value can be yielded.
337 Err(_) => {
338 yield sync_indicator;
339
340 // Now that `sync_indicator` has been yielded, let's wait on
341 // the next state again.
342 state.next().await
343 }
344 };
345
346 if let Some(next_state) = next_state {
347 // Update the `current_state`.
348 current_state = next_state;
349 } else {
350 // Something is broken with the state. Let's stop this stream too.
351 break;
352 }
353 }
354 }
355 }
356
357 /// Get the [`Client`] that has been used to create [`Self`].
358 pub fn client(&self) -> &Client {
359 &self.client
360 }
361
362 /// Get a subscriber to the state.
363 pub fn state(&self) -> Subscriber<State> {
364 self.state_machine.subscribe()
365 }
366
367 async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
368 RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
369 }
370
371 /// Get a [`RoomList`] for all rooms.
372 pub async fn all_rooms(&self) -> Result<RoomList, Error> {
373 self.list_for(ALL_ROOMS_LIST_NAME).await
374 }
375
376 /// Get a [`Room`] if it exists.
377 pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
378 self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
379 }
380
381 /// Subscribe to rooms.
382 ///
383 /// It means that all events from these rooms will be received every time,
384 /// no matter how the `RoomList` is configured.
385 pub fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
386 let settings = assign!(http::request::RoomSubscription::default(), {
387 required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
388 (state_event.clone(), (*value).to_owned())
389 })
390 .chain(
391 DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
392 (state_event.clone(), (*value).to_owned())
393 })
394 )
395 .collect(),
396 timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
397 });
398
399 let cancel_in_flight_request = match self.state_machine.get() {
400 State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
401 false
402 }
403 State::SettingUp | State::Running => true,
404 };
405
406 self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
407 }
408
409 #[cfg(test)]
410 pub fn sliding_sync(&self) -> &SlidingSync {
411 &self.sliding_sync
412 }
413}
414
415/// [`RoomList`]'s errors.
416#[derive(Debug, Error)]
417pub enum Error {
418 /// Error from [`matrix_sdk::SlidingSync`].
419 #[error(transparent)]
420 SlidingSync(SlidingSyncError),
421
422 /// An operation has been requested on an unknown list.
423 #[error("Unknown list `{0}`")]
424 UnknownList(String),
425
426 /// The requested room doesn't exist.
427 #[error("Room `{0}` not found")]
428 RoomNotFound(OwnedRoomId),
429
430 #[error(transparent)]
431 EventCache(#[from] EventCacheError),
432}
433
434/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
435/// user, indicating that the [`RoomListService`] is syncing.
436///
437/// This is entirely arbitrary and optinionated. Of course, once
438/// [`RoomListService::sync`] has been called, it's going to be constantly
439/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
440/// happened. But in some cases, it's better for the user experience to prompt
441/// to the user that a sync is happening. It's usually the first sync, or the
442/// recovering sync. However, the sync indicator must be prompted if the
443/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
444/// pretty fast, which can be very confusing. It's also common to indicate to
445/// the user that a syncing is happening in case of a network error, that
446/// something is catching up etc.
447#[derive(Debug, Eq, PartialEq)]
448pub enum SyncIndicator {
449 /// Show the sync indicator.
450 Show,
451
452 /// Hide the sync indicator.
453 Hide,
454}
455
456#[cfg(test)]
457mod tests {
458 use std::future::ready;
459
460 use futures_util::{pin_mut, StreamExt};
461 use matrix_sdk::{
462 config::RequestConfig, test_utils::client::mock_matrix_session, Client, SlidingSyncMode,
463 };
464 use matrix_sdk_test::async_test;
465 use ruma::api::MatrixVersion;
466 use serde_json::json;
467 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
468
469 use super::{Error, RoomListService, State, ALL_ROOMS_LIST_NAME};
470
471 async fn new_client() -> (Client, MockServer) {
472 let session = mock_matrix_session();
473
474 let server = MockServer::start().await;
475 let client = Client::builder()
476 .homeserver_url(server.uri())
477 .server_versions([MatrixVersion::V1_0])
478 .request_config(RequestConfig::new().disable_retry())
479 .build()
480 .await
481 .unwrap();
482 client.restore_session(session).await.unwrap();
483
484 (client, server)
485 }
486
487 pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
488 let (client, _) = new_client().await;
489
490 RoomListService::new(client).await
491 }
492
493 struct SlidingSyncMatcher;
494
495 impl Match for SlidingSyncMatcher {
496 fn matches(&self, request: &Request) -> bool {
497 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
498 && request.method == Method::POST
499 }
500 }
501
502 #[async_test]
503 async fn test_all_rooms_are_declared() -> Result<(), Error> {
504 let room_list = new_room_list().await?;
505 let sliding_sync = room_list.sliding_sync();
506
507 // List is present, in Selective mode.
508 assert_eq!(
509 sliding_sync
510 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
511 list.sync_mode(),
512 SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
513 )))
514 .await,
515 Some(true)
516 );
517
518 Ok(())
519 }
520
521 #[async_test]
522 async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
523 let (client, server) = new_client().await;
524
525 let room_list = RoomListService::new(client).await?;
526
527 let sync = room_list.sync();
528 pin_mut!(sync);
529
530 // Run a first sync.
531 {
532 let _mock_guard = Mock::given(SlidingSyncMatcher)
533 .respond_with(move |_request: &Request| {
534 ResponseTemplate::new(200).set_body_json(json!({
535 "pos": "0",
536 "lists": {
537 ALL_ROOMS_LIST_NAME: {
538 "count": 0,
539 "ops": [],
540 },
541 },
542 "rooms": {},
543 }))
544 })
545 .mount_as_scoped(&server)
546 .await;
547
548 let _ = sync.next().await;
549 }
550
551 assert_eq!(room_list.state().get(), State::SettingUp);
552
553 // Stop the sync.
554 room_list.stop_sync()?;
555
556 // Do another sync.
557 let _ = sync.next().await;
558
559 // State is `Terminated`, as expected!
560 assert_eq!(
561 room_list.state_machine.get(),
562 State::Terminated { from: Box::new(State::Running) }
563 );
564
565 // Now, let's make the sliding sync session to expire.
566 room_list.expire_sync_session().await;
567
568 // State is `Error`, as a regular session expiration would generate!
569 assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
570
571 Ok(())
572 }
573}