matrix_sdk_ui/room_list_service/mod.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//! list. Its goal is to load all the user' rooms. It starts with a
37//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//! set of rooms) to load the first rooms quickly, and then updates to a
39//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//! background”: it will sync the existing rooms and will fetch new rooms, by
41//! a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room_list;
56pub mod sorters;
57mod state;
58
59use std::{sync::Arc, time::Duration};
60
61use async_stream::stream;
62use eyeball::Subscriber;
63use futures_util::{Stream, StreamExt, pin_mut};
64use matrix_sdk::{
65 Client, Error as SlidingSyncError, Room, SlidingSync, SlidingSyncList, SlidingSyncMode,
66 event_cache::EventCacheError, timeout::timeout,
67};
68pub use room_list::*;
69use ruma::{
70 OwnedRoomId, RoomId, UInt, api::client::sync::sync_events::v5 as http, assign,
71 events::StateEventType,
72};
73pub use state::*;
74use thiserror::Error;
75use tracing::{debug, error};
76
77/// The default `required_state` constant value for sliding sync lists and
78/// sliding sync room subscriptions.
79const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
80 (StateEventType::RoomName, ""),
81 (StateEventType::RoomEncryption, ""),
82 (StateEventType::RoomMember, "$LAZY"),
83 (StateEventType::RoomMember, "$ME"),
84 (StateEventType::RoomTopic, ""),
85 // Temporary workaround for https://github.com/matrix-org/matrix-rust-sdk/issues/5285
86 (StateEventType::RoomAvatar, ""),
87 (StateEventType::RoomCanonicalAlias, ""),
88 (StateEventType::RoomPowerLevels, ""),
89 (StateEventType::CallMember, "*"),
90 (StateEventType::RoomJoinRules, ""),
91 (StateEventType::RoomTombstone, ""),
92 // Those two events are required to properly compute room previews.
93 // `StateEventType::RoomCreate` is also necessary to compute the room
94 // version, and thus handling the tombstoned room correctly.
95 (StateEventType::RoomCreate, ""),
96 (StateEventType::RoomHistoryVisibility, ""),
97 // Required to correctly calculate the room display name.
98 (StateEventType::MemberHints, ""),
99 (StateEventType::SpaceParent, "*"),
100 (StateEventType::SpaceChild, "*"),
101];
102
103/// The default `required_state` constant value for sliding sync room
104/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
105const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
106 &[(StateEventType::RoomPinnedEvents, "")];
107
108/// The default `timeline_limit` value when used with room subscriptions.
109const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
110
111/// The [`RoomListService`] type. See the module's documentation to learn more.
112#[derive(Debug)]
113pub struct RoomListService {
114 /// Client that has created this [`RoomListService`].
115 client: Client,
116
117 /// The Sliding Sync instance.
118 sliding_sync: Arc<SlidingSync>,
119
120 /// The current state of the `RoomListService`.
121 ///
122 /// `RoomListService` is a simple state-machine.
123 state_machine: StateMachine,
124}
125
126impl RoomListService {
127 /// Create a new `RoomList`.
128 ///
129 /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
130 /// already pre-configured.
131 ///
132 /// This won't start an encryption sync, and it's the user's responsibility
133 /// to create one in this case using
134 /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
135 pub async fn new(client: Client) -> Result<Self, Error> {
136 Self::new_with_share_pos(client, true).await
137 }
138
139 /// Like [`RoomListService::new`] but with a flag to turn the
140 /// [`SlidingSyncBuilder::share_pos`] on and off.
141 ///
142 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
143 pub async fn new_with_share_pos(client: Client, share_pos: bool) -> Result<Self, Error> {
144 let mut builder = client
145 .sliding_sync("room-list")
146 .map_err(Error::SlidingSync)?
147 .with_account_data_extension(
148 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
149 )
150 .with_receipt_extension(assign!(http::request::Receipts::default(), {
151 enabled: Some(true),
152 rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
153 }))
154 .with_typing_extension(assign!(http::request::Typing::default(), {
155 enabled: Some(true),
156 }));
157
158 if client.enabled_thread_subscriptions() {
159 builder = builder.with_thread_subscriptions_extension(
160 assign!(http::request::ThreadSubscriptions::default(), {
161 enabled: Some(true),
162 limit: Some(ruma::uint!(10))
163 }),
164 );
165 }
166
167 if share_pos {
168 // We don't deal with encryption device messages here so this is safe
169 builder = builder.share_pos();
170 }
171
172 let sliding_sync = builder
173 .add_cached_list(
174 SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
175 .sync_mode(
176 SlidingSyncMode::new_selective()
177 .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
178 )
179 .timeline_limit(1)
180 .required_state(
181 DEFAULT_REQUIRED_STATE
182 .iter()
183 .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
184 .collect(),
185 )
186 .filters(Some(assign!(http::request::ListFilters::default(), {
187 // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
188 // If unset, both invited and joined rooms are returned. If false, no invited rooms are
189 // returned. If true, only invited rooms are returned.
190 is_invite: None,
191 }))),
192 )
193 .await
194 .map_err(Error::SlidingSync)?
195 .build()
196 .await
197 .map(Arc::new)
198 .map_err(Error::SlidingSync)?;
199
200 // Eagerly subscribe the event cache to sync responses.
201 client.event_cache().subscribe()?;
202
203 let state_machine = StateMachine::new();
204
205 // If the sliding sync has successfully restored a sync position, skip the
206 // waiting for the initial sync, and set the state to `SettingUp`; this
207 // way, the first sync will move us to the steady state, and update the
208 // sliding sync list to use the growing sync mode.
209 if sliding_sync.has_pos().await {
210 state_machine.set(State::SettingUp);
211 }
212
213 Ok(Self { client, sliding_sync, state_machine })
214 }
215
216 /// Start to sync the room list.
217 ///
218 /// It's the main method of this entire API. Calling `sync` allows to
219 /// receive updates on the room list: new rooms, rooms updates etc. Those
220 /// updates can be read with [`RoomList::entries`] for example. This method
221 /// returns a [`Stream`] where produced items only hold an empty value
222 /// in case of a sync success, otherwise an error.
223 ///
224 /// The `RoomListService`' state machine is run by this method.
225 ///
226 /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
227 /// calling [`Self::sync`] again will resume from the previous state of
228 /// the state machine.
229 ///
230 /// This should be used only for testing. In practice, most users should be
231 /// using the [`SyncService`] instead.
232 #[doc(hidden)]
233 pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
234 stream! {
235 let sync = self.sliding_sync.sync();
236 pin_mut!(sync);
237
238 // This is a state machine implementation.
239 // Things happen in this order:
240 //
241 // 1. The next state is calculated,
242 // 2. The actions associated to the next state are run,
243 // 3. A sync is done,
244 // 4. The next state is stored.
245 loop {
246 debug!("Run a sync iteration");
247
248 // Calculate the next state, and run the associated actions.
249 let next_state = self.state_machine.next(&self.sliding_sync).await?;
250
251 // Do the sync.
252 match sync.next().await {
253 // Got a successful result while syncing.
254 Some(Ok(_update_summary)) => {
255 debug!(state = ?next_state, "New state");
256
257 // Update the state.
258 self.state_machine.set(next_state);
259
260 yield Ok(());
261 }
262
263 // Got an error while syncing.
264 Some(Err(error)) => {
265 debug!(expected_state = ?next_state, "New state is an error");
266
267 let next_state = State::Error { from: Box::new(next_state) };
268 self.state_machine.set(next_state);
269
270 yield Err(Error::SlidingSync(error));
271
272 break;
273 }
274
275 // Sync loop has terminated.
276 None => {
277 debug!(expected_state = ?next_state, "New state is a termination");
278
279 let next_state = State::Terminated { from: Box::new(next_state) };
280 self.state_machine.set(next_state);
281
282 break;
283 }
284 }
285 }
286 }
287 }
288
289 /// Force to stop the sync of the `RoomListService` started by
290 /// [`Self::sync`].
291 ///
292 /// It's of utter importance to call this method rather than stop polling
293 /// the `Stream` returned by [`Self::sync`] because it will force the
294 /// cancellation and exit the sync loop, i.e. it will cancel any
295 /// in-flight HTTP requests, cancel any pending futures etc. and put the
296 /// service into a termination state.
297 ///
298 /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
299 /// until it returns `None`, because of [`Self::stop_sync`], so that it
300 /// ensures the states are correctly placed.
301 ///
302 /// Stopping the sync of the room list via this method will put the
303 /// state-machine into the [`State::Terminated`] state.
304 ///
305 /// This should be used only for testing. In practice, most users should be
306 /// using the [`SyncService`] instead.
307 #[doc(hidden)]
308 pub fn stop_sync(&self) -> Result<(), Error> {
309 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
310 }
311
312 /// Force the sliding sync session to expire.
313 ///
314 /// This is used by [`SyncService`][crate::SyncService].
315 ///
316 /// **Warning**: This method **must not** be called while the sync loop is
317 /// running!
318 pub(crate) async fn expire_sync_session(&self) {
319 self.sliding_sync.expire_session().await;
320
321 // Usually, when the session expires, it leads the state to be `Error`,
322 // thus some actions (like refreshing the lists) are executed. However,
323 // if the sync loop has been stopped manually, the state is `Terminated`, and
324 // when the session is forced to expire, the state remains `Terminated`, thus
325 // the actions aren't executed as expected. Consequently, let's update the
326 // state.
327 if let State::Terminated { from } = self.state_machine.get() {
328 self.state_machine.set(State::Error { from });
329 }
330 }
331
332 /// Get a [`Stream`] of [`SyncIndicator`].
333 ///
334 /// Read the documentation of [`SyncIndicator`] to learn more about it.
335 pub fn sync_indicator(
336 &self,
337 delay_before_showing: Duration,
338 delay_before_hiding: Duration,
339 ) -> impl Stream<Item = SyncIndicator> + use<> {
340 let mut state = self.state();
341
342 stream! {
343 // Ensure the `SyncIndicator` is always hidden to start with.
344 yield SyncIndicator::Hide;
345
346 // Let's not wait for an update to happen. The `SyncIndicator` must be
347 // computed as fast as possible.
348 let mut current_state = state.next_now();
349
350 loop {
351 let (sync_indicator, yield_delay) = match current_state {
352 State::Init | State::SettingUp | State::Error { .. } => {
353 (SyncIndicator::Show, delay_before_showing)
354 }
355
356 State::Recovering | State::Running | State::Terminated { .. } => {
357 (SyncIndicator::Hide, delay_before_hiding)
358 }
359 };
360
361 // `state.next().await` has a maximum of `yield_delay` time to execute…
362 let next_state = match timeout(state.next(), yield_delay).await {
363 // A new state has been received before `yield_delay` time. The new
364 // `sync_indicator` value won't be yielded.
365 Ok(next_state) => next_state,
366
367 // No new state has been received before `yield_delay` time. The
368 // `sync_indicator` value can be yielded.
369 Err(_) => {
370 yield sync_indicator;
371
372 // Now that `sync_indicator` has been yielded, let's wait on
373 // the next state again.
374 state.next().await
375 }
376 };
377
378 if let Some(next_state) = next_state {
379 // Update the `current_state`.
380 current_state = next_state;
381 } else {
382 // Something is broken with the state. Let's stop this stream too.
383 break;
384 }
385 }
386 }
387 }
388
389 /// Get the [`Client`] that has been used to create [`Self`].
390 pub fn client(&self) -> &Client {
391 &self.client
392 }
393
394 /// Get a subscriber to the state.
395 pub fn state(&self) -> Subscriber<State> {
396 self.state_machine.subscribe()
397 }
398
399 async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
400 RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
401 }
402
403 /// Get a [`RoomList`] for all rooms.
404 pub async fn all_rooms(&self) -> Result<RoomList, Error> {
405 self.list_for(ALL_ROOMS_LIST_NAME).await
406 }
407
408 /// Get a [`Room`] if it exists.
409 pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
410 self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
411 }
412
413 /// Subscribe to rooms.
414 ///
415 /// It means that all events from these rooms will be received every time,
416 /// no matter how the `RoomList` is configured.
417 ///
418 /// [`LatestEvents::listen_to_room`][listen_to_room] will be called for each
419 /// room in `room_ids`, so that the [`LatestEventValue`] will automatically
420 /// be calculated and updated for these rooms, for free.
421 ///
422 /// [listen_to_room]: matrix_sdk::latest_events::LatestEvents::listen_to_room
423 /// [`LatestEventValue`]: matrix_sdk::latest_events::LatestEventValue
424 pub async fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
425 // Calculate the settings for the room subscriptions.
426 let settings = assign!(http::request::RoomSubscription::default(), {
427 required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
428 (state_event.clone(), (*value).to_owned())
429 })
430 .chain(
431 DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
432 (state_event.clone(), (*value).to_owned())
433 })
434 )
435 .collect(),
436 timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
437 });
438
439 // Decide whether the in-flight request (if any) should be cancelled if needed.
440 let cancel_in_flight_request = match self.state_machine.get() {
441 State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
442 false
443 }
444 State::SettingUp | State::Running => true,
445 };
446
447 // Before subscribing, let's listen these rooms to calculate their latest
448 // events.
449 let latest_events = self.client.latest_events().await;
450
451 for room_id in room_ids {
452 if let Err(error) = latest_events.listen_to_room(room_id).await {
453 // Let's not fail the room subscription. Instead, emit a log because it's very
454 // unlikely to happen.
455 error!(?error, ?room_id, "Failed to listen to the latest event for this room");
456 }
457 }
458
459 // Subscribe to the rooms.
460 self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
461 }
462
463 #[cfg(test)]
464 pub fn sliding_sync(&self) -> &SlidingSync {
465 &self.sliding_sync
466 }
467}
468
469/// [`RoomList`]'s errors.
470#[derive(Debug, Error)]
471pub enum Error {
472 /// Error from [`matrix_sdk::SlidingSync`].
473 #[error(transparent)]
474 SlidingSync(SlidingSyncError),
475
476 /// An operation has been requested on an unknown list.
477 #[error("Unknown list `{0}`")]
478 UnknownList(String),
479
480 /// The requested room doesn't exist.
481 #[error("Room `{0}` not found")]
482 RoomNotFound(OwnedRoomId),
483
484 #[error(transparent)]
485 EventCache(#[from] EventCacheError),
486}
487
488/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
489/// user, indicating that the [`RoomListService`] is syncing.
490///
491/// This is entirely arbitrary and optinionated. Of course, once
492/// [`RoomListService::sync`] has been called, it's going to be constantly
493/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
494/// happened. But in some cases, it's better for the user experience to prompt
495/// to the user that a sync is happening. It's usually the first sync, or the
496/// recovering sync. However, the sync indicator must be prompted if the
497/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
498/// pretty fast, which can be very confusing. It's also common to indicate to
499/// the user that a syncing is happening in case of a network error, that
500/// something is catching up etc.
501#[derive(Debug, Eq, PartialEq)]
502pub enum SyncIndicator {
503 /// Show the sync indicator.
504 Show,
505
506 /// Hide the sync indicator.
507 Hide,
508}
509
510#[cfg(test)]
511mod tests {
512 use std::future::ready;
513
514 use futures_util::{StreamExt, pin_mut};
515 use matrix_sdk::{
516 Client, SlidingSyncMode, config::RequestConfig, test_utils::client::mock_matrix_session,
517 };
518 use matrix_sdk_test::async_test;
519 use ruma::api::MatrixVersion;
520 use serde_json::json;
521 use wiremock::{Match, Mock, MockServer, Request, ResponseTemplate, http::Method};
522
523 use super::{ALL_ROOMS_LIST_NAME, Error, RoomListService, State};
524
525 async fn new_client() -> (Client, MockServer) {
526 let session = mock_matrix_session();
527
528 let server = MockServer::start().await;
529 let client = Client::builder()
530 .homeserver_url(server.uri())
531 .server_versions([MatrixVersion::V1_0])
532 .request_config(RequestConfig::new().disable_retry())
533 .build()
534 .await
535 .unwrap();
536 client.restore_session(session).await.unwrap();
537
538 (client, server)
539 }
540
541 pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
542 let (client, _) = new_client().await;
543
544 RoomListService::new(client).await
545 }
546
547 struct SlidingSyncMatcher;
548
549 impl Match for SlidingSyncMatcher {
550 fn matches(&self, request: &Request) -> bool {
551 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
552 && request.method == Method::POST
553 }
554 }
555
556 #[async_test]
557 async fn test_all_rooms_are_declared() -> Result<(), Error> {
558 let room_list = new_room_list().await?;
559 let sliding_sync = room_list.sliding_sync();
560
561 // List is present, in Selective mode.
562 assert_eq!(
563 sliding_sync
564 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
565 list.sync_mode(),
566 SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
567 )))
568 .await,
569 Some(true)
570 );
571
572 Ok(())
573 }
574
575 #[async_test]
576 async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
577 let (client, server) = new_client().await;
578
579 let room_list = RoomListService::new(client).await?;
580
581 let sync = room_list.sync();
582 pin_mut!(sync);
583
584 // Run a first sync.
585 {
586 let _mock_guard = Mock::given(SlidingSyncMatcher)
587 .respond_with(move |_request: &Request| {
588 ResponseTemplate::new(200).set_body_json(json!({
589 "pos": "0",
590 "lists": {
591 ALL_ROOMS_LIST_NAME: {
592 "count": 0,
593 "ops": [],
594 },
595 },
596 "rooms": {},
597 }))
598 })
599 .mount_as_scoped(&server)
600 .await;
601
602 let _ = sync.next().await;
603 }
604
605 assert_eq!(room_list.state().get(), State::SettingUp);
606
607 // Stop the sync.
608 room_list.stop_sync()?;
609
610 // Do another sync.
611 let _ = sync.next().await;
612
613 // State is `Terminated`, as expected!
614 assert_eq!(
615 room_list.state_machine.get(),
616 State::Terminated { from: Box::new(State::Running) }
617 );
618
619 // Now, let's make the sliding sync session to expire.
620 room_list.expire_sync_session().await;
621
622 // State is `Error`, as a regular session expiration would generate!
623 assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
624
625 Ok(())
626 }
627}