matrix_sdk_ui/room_list_service/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//!   list. Its goal is to load all the user' rooms. It starts with a
37//!   [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//!   set of rooms) to load the first rooms quickly, and then updates to a
39//!   [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//!   background”: it will sync the existing rooms and will fetch new rooms, by
41//!   a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room;
56mod room_list;
57pub mod sorters;
58mod state;
59
60use std::{sync::Arc, time::Duration};
61
62use async_stream::stream;
63use eyeball::Subscriber;
64use futures_util::{pin_mut, Stream, StreamExt};
65use matrix_sdk::{
66    event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, SlidingSync,
67    SlidingSyncList, SlidingSyncMode,
68};
69use matrix_sdk_base::sliding_sync::http;
70pub use room::*;
71pub use room_list::*;
72use ruma::{assign, directory::RoomTypeFilter, events::StateEventType, OwnedRoomId, RoomId, UInt};
73pub use state::*;
74use thiserror::Error;
75use tracing::debug;
76
77use crate::timeline;
78
79/// The default `required_state` constant value for sliding sync lists and
80/// sliding sync room subscriptions.
81const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
82    (StateEventType::RoomName, ""),
83    (StateEventType::RoomEncryption, ""),
84    (StateEventType::RoomMember, "$LAZY"),
85    (StateEventType::RoomMember, "$ME"),
86    (StateEventType::RoomTopic, ""),
87    (StateEventType::RoomCanonicalAlias, ""),
88    (StateEventType::RoomPowerLevels, ""),
89    (StateEventType::CallMember, "*"),
90    (StateEventType::RoomJoinRules, ""),
91    // Those two events are required to properly compute room previews.
92    (StateEventType::RoomCreate, ""),
93    (StateEventType::RoomHistoryVisibility, ""),
94    // Required to correctly calculate the room display name.
95    (StateEventType::MemberHints, ""),
96];
97
98/// The default `required_state` constant value for sliding sync room
99/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
100const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
101    &[(StateEventType::RoomPinnedEvents, "")];
102
103/// The default `timeline_limit` value when used with room subscriptions.
104const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
105
106/// The [`RoomListService`] type. See the module's documentation to learn more.
107#[derive(Debug)]
108pub struct RoomListService {
109    /// Client that has created this [`RoomListService`].
110    client: Client,
111
112    /// The Sliding Sync instance.
113    sliding_sync: Arc<SlidingSync>,
114
115    /// The current state of the `RoomListService`.
116    ///
117    /// `RoomListService` is a simple state-machine.
118    state_machine: StateMachine,
119}
120
121impl RoomListService {
122    /// Create a new `RoomList`.
123    ///
124    /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
125    /// already pre-configured.
126    ///
127    /// This won't start an encryption sync, and it's the user's responsibility
128    /// to create one in this case using
129    /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
130    pub async fn new(client: Client) -> Result<Self, Error> {
131        let builder = client
132            .sliding_sync("room-list")
133            .map_err(Error::SlidingSync)?
134            .with_account_data_extension(
135                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
136            )
137            .with_receipt_extension(assign!(http::request::Receipts::default(), {
138                enabled: Some(true),
139                rooms: Some(vec![http::request::ReceiptsRoom::AllSubscribed])
140            }))
141            .with_typing_extension(assign!(http::request::Typing::default(), {
142                enabled: Some(true),
143            }));
144        // TODO: Re-enable once we know it creates slowness.
145        // // We don't deal with encryption device messages here so this is safe
146        // .share_pos();
147
148        let sliding_sync = builder
149            .add_cached_list(
150                SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
151                    .sync_mode(
152                        SlidingSyncMode::new_selective()
153                            .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
154                    )
155                    .timeline_limit(1)
156                    .required_state(
157                        DEFAULT_REQUIRED_STATE
158                            .iter()
159                            .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
160                            .collect(),
161                    )
162                    .include_heroes(Some(true))
163                    .filters(Some(assign!(http::request::ListFilters::default(), {
164                        // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
165                        // If unset, both invited and joined rooms are returned. If false, no invited rooms are
166                        // returned. If true, only invited rooms are returned.
167                        is_invite: None,
168                        not_room_types: vec![RoomTypeFilter::Space],
169                    }))),
170            )
171            .await
172            .map_err(Error::SlidingSync)?
173            .build()
174            .await
175            .map(Arc::new)
176            .map_err(Error::SlidingSync)?;
177
178        // Eagerly subscribe the event cache to sync responses.
179        client.event_cache().subscribe()?;
180
181        Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
182    }
183
184    /// Start to sync the room list.
185    ///
186    /// It's the main method of this entire API. Calling `sync` allows to
187    /// receive updates on the room list: new rooms, rooms updates etc. Those
188    /// updates can be read with [`RoomList::entries`] for example. This method
189    /// returns a [`Stream`] where produced items only hold an empty value
190    /// in case of a sync success, otherwise an error.
191    ///
192    /// The `RoomListService`' state machine is run by this method.
193    ///
194    /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
195    /// calling [`Self::sync`] again will resume from the previous state of
196    /// the state machine.
197    ///
198    /// This should be used only for testing. In practice, most users should be
199    /// using the [`SyncService`] instead.
200    #[doc(hidden)]
201    pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
202        stream! {
203            let sync = self.sliding_sync.sync();
204            pin_mut!(sync);
205
206            // This is a state machine implementation.
207            // Things happen in this order:
208            //
209            // 1. The next state is calculated,
210            // 2. The actions associated to the next state are run,
211            // 3. A sync is done,
212            // 4. The next state is stored.
213            loop {
214                debug!("Run a sync iteration");
215
216                // Calculate the next state, and run the associated actions.
217                let next_state = self.state_machine.next(&self.sliding_sync).await?;
218
219                // Do the sync.
220                match sync.next().await {
221                    // Got a successful result while syncing.
222                    Some(Ok(_update_summary)) => {
223                        debug!(state = ?next_state, "New state");
224
225                        // Update the state.
226                        self.state_machine.set(next_state);
227
228                        yield Ok(());
229                    }
230
231                    // Got an error while syncing.
232                    Some(Err(error)) => {
233                        debug!(expected_state = ?next_state, "New state is an error");
234
235                        let next_state = State::Error { from: Box::new(next_state) };
236                        self.state_machine.set(next_state);
237
238                        yield Err(Error::SlidingSync(error));
239
240                        break;
241                    }
242
243                    // Sync loop has terminated.
244                    None => {
245                        debug!(expected_state = ?next_state, "New state is a termination");
246
247                        let next_state = State::Terminated { from: Box::new(next_state) };
248                        self.state_machine.set(next_state);
249
250                        break;
251                    }
252                }
253            }
254        }
255    }
256
257    /// Force to stop the sync of the `RoomListService` started by
258    /// [`Self::sync`].
259    ///
260    /// It's of utter importance to call this method rather than stop polling
261    /// the `Stream` returned by [`Self::sync`] because it will force the
262    /// cancellation and exit the sync loop, i.e. it will cancel any
263    /// in-flight HTTP requests, cancel any pending futures etc. and put the
264    /// service into a termination state.
265    ///
266    /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
267    /// until it returns `None`, because of [`Self::stop_sync`], so that it
268    /// ensures the states are correctly placed.
269    ///
270    /// Stopping the sync of the room list via this method will put the
271    /// state-machine into the [`State::Terminated`] state.
272    ///
273    /// This should be used only for testing. In practice, most users should be
274    /// using the [`SyncService`] instead.
275    #[doc(hidden)]
276    pub fn stop_sync(&self) -> Result<(), Error> {
277        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
278    }
279
280    /// Force the sliding sync session to expire.
281    ///
282    /// This is used by [`SyncService`][crate::SyncService].
283    ///
284    /// **Warning**: This method **must not** be called while the sync loop is
285    /// running!
286    pub(crate) async fn expire_sync_session(&self) {
287        self.sliding_sync.expire_session().await;
288
289        // Usually, when the session expires, it leads the state to be `Error`,
290        // thus some actions (like refreshing the lists) are executed. However,
291        // if the sync loop has been stopped manually, the state is `Terminated`, and
292        // when the session is forced to expire, the state remains `Terminated`, thus
293        // the actions aren't executed as expected. Consequently, let's update the
294        // state.
295        if let State::Terminated { from } = self.state_machine.get() {
296            self.state_machine.set(State::Error { from });
297        }
298    }
299
300    /// Get a [`Stream`] of [`SyncIndicator`].
301    ///
302    /// Read the documentation of [`SyncIndicator`] to learn more about it.
303    pub fn sync_indicator(
304        &self,
305        delay_before_showing: Duration,
306        delay_before_hiding: Duration,
307    ) -> impl Stream<Item = SyncIndicator> {
308        let mut state = self.state();
309
310        stream! {
311            // Ensure the `SyncIndicator` is always hidden to start with.
312            yield SyncIndicator::Hide;
313
314            // Let's not wait for an update to happen. The `SyncIndicator` must be
315            // computed as fast as possible.
316            let mut current_state = state.next_now();
317
318            loop {
319                let (sync_indicator, yield_delay) = match current_state {
320                    State::Init | State::Error { .. } => {
321                        (SyncIndicator::Show, delay_before_showing)
322                    }
323
324                    State::SettingUp | State::Recovering | State::Running | State::Terminated { .. } => {
325                        (SyncIndicator::Hide, delay_before_hiding)
326                    }
327                };
328
329                // `state.next().await` has a maximum of `yield_delay` time to execute…
330                let next_state = match timeout(state.next(), yield_delay).await {
331                    // A new state has been received before `yield_delay` time. The new
332                    // `sync_indicator` value won't be yielded.
333                    Ok(next_state) => next_state,
334
335                    // No new state has been received before `yield_delay` time. The
336                    // `sync_indicator` value can be yielded.
337                    Err(_) => {
338                        yield sync_indicator;
339
340                        // Now that `sync_indicator` has been yielded, let's wait on
341                        // the next state again.
342                        state.next().await
343                    }
344                };
345
346                if let Some(next_state) = next_state {
347                    // Update the `current_state`.
348                    current_state = next_state;
349                } else {
350                    // Something is broken with the state. Let's stop this stream too.
351                    break;
352                }
353            }
354        }
355    }
356
357    /// Get the [`Client`] that has been used to create [`Self`].
358    pub fn client(&self) -> &Client {
359        &self.client
360    }
361
362    /// Get a subscriber to the state.
363    pub fn state(&self) -> Subscriber<State> {
364        self.state_machine.subscribe()
365    }
366
367    async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
368        RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
369    }
370
371    /// Get a [`RoomList`] for all rooms.
372    pub async fn all_rooms(&self) -> Result<RoomList, Error> {
373        self.list_for(ALL_ROOMS_LIST_NAME).await
374    }
375
376    /// Get a [`Room`] if it exists.
377    pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
378        Ok(Room::new(
379            self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))?,
380            &self.sliding_sync,
381        ))
382    }
383
384    /// Subscribe to rooms.
385    ///
386    /// It means that all events from these rooms will be received every time,
387    /// no matter how the `RoomList` is configured.
388    pub fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
389        let settings = assign!(http::request::RoomSubscription::default(), {
390            required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
391                (state_event.clone(), (*value).to_owned())
392            })
393            .chain(
394                DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
395                    (state_event.clone(), (*value).to_owned())
396                })
397            )
398            .collect(),
399            timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
400        });
401
402        let cancel_in_flight_request = match self.state_machine.get() {
403            State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
404                false
405            }
406            State::SettingUp | State::Running => true,
407        };
408
409        self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
410    }
411
412    #[cfg(test)]
413    pub fn sliding_sync(&self) -> &SlidingSync {
414        &self.sliding_sync
415    }
416}
417
418/// [`RoomList`]'s errors.
419#[derive(Debug, Error)]
420pub enum Error {
421    /// Error from [`matrix_sdk::SlidingSync`].
422    #[error(transparent)]
423    SlidingSync(SlidingSyncError),
424
425    /// An operation has been requested on an unknown list.
426    #[error("Unknown list `{0}`")]
427    UnknownList(String),
428
429    /// The requested room doesn't exist.
430    #[error("Room `{0}` not found")]
431    RoomNotFound(OwnedRoomId),
432
433    #[error("A timeline instance already exists for room {0}")]
434    TimelineAlreadyExists(OwnedRoomId),
435
436    #[error(transparent)]
437    InitializingTimeline(#[from] timeline::Error),
438
439    #[error(transparent)]
440    EventCache(#[from] EventCacheError),
441}
442
443/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
444/// user, indicating that the [`RoomListService`] is syncing.
445///
446/// This is entirely arbitrary and optinionated. Of course, once
447/// [`RoomListService::sync`] has been called, it's going to be constantly
448/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
449/// happened. But in some cases, it's better for the user experience to prompt
450/// to the user that a sync is happening. It's usually the first sync, or the
451/// recovering sync. However, the sync indicator must be prompted if the
452/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
453/// pretty fast, which can be very confusing. It's also common to indicate to
454/// the user that a syncing is happening in case of a network error, that
455/// something is catching up etc.
456#[derive(Debug, Eq, PartialEq)]
457pub enum SyncIndicator {
458    /// Show the sync indicator.
459    Show,
460
461    /// Hide the sync indicator.
462    Hide,
463}
464
465#[cfg(test)]
466mod tests {
467    use std::future::ready;
468
469    use assert_matches::assert_matches;
470    use futures_util::{pin_mut, StreamExt};
471    use matrix_sdk::{
472        authentication::matrix::{MatrixSession, MatrixSessionTokens},
473        config::RequestConfig,
474        reqwest::Url,
475        sliding_sync::Version as SlidingSyncVersion,
476        Client, SlidingSyncMode,
477    };
478    use matrix_sdk_base::SessionMeta;
479    use matrix_sdk_test::async_test;
480    use ruma::{api::MatrixVersion, device_id, user_id};
481    use serde_json::json;
482    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
483
484    use super::{Error, RoomListService, State, ALL_ROOMS_LIST_NAME};
485
486    async fn new_client() -> (Client, MockServer) {
487        let session = MatrixSession {
488            meta: SessionMeta {
489                user_id: user_id!("@example:localhost").to_owned(),
490                device_id: device_id!("DEVICEID").to_owned(),
491            },
492            tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
493        };
494
495        let server = MockServer::start().await;
496        let client = Client::builder()
497            .homeserver_url(server.uri())
498            .server_versions([MatrixVersion::V1_0])
499            .request_config(RequestConfig::new().disable_retry())
500            .build()
501            .await
502            .unwrap();
503        client.restore_session(session).await.unwrap();
504
505        (client, server)
506    }
507
508    pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
509        let (client, _) = new_client().await;
510
511        RoomListService::new(client).await
512    }
513
514    struct SlidingSyncMatcher;
515
516    impl Match for SlidingSyncMatcher {
517        fn matches(&self, request: &Request) -> bool {
518            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
519                && request.method == Method::POST
520        }
521    }
522
523    #[async_test]
524    async fn test_sliding_sync_proxy_url() -> Result<(), Error> {
525        let (client, _) = new_client().await;
526
527        {
528            let room_list = RoomListService::new(client.clone()).await?;
529            assert_matches!(room_list.sliding_sync().version(), SlidingSyncVersion::Native);
530        }
531
532        {
533            let url = Url::parse("https://foo.matrix/").unwrap();
534            client.set_sliding_sync_version(SlidingSyncVersion::Proxy { url: url.clone() });
535
536            let room_list = RoomListService::new(client.clone()).await?;
537            assert_matches!(
538                room_list.sliding_sync().version(),
539                SlidingSyncVersion::Proxy { url: given_url } => {
540                    assert_eq!(&url, given_url);
541                }
542            );
543        }
544
545        Ok(())
546    }
547
548    #[async_test]
549    async fn test_all_rooms_are_declared() -> Result<(), Error> {
550        let room_list = new_room_list().await?;
551        let sliding_sync = room_list.sliding_sync();
552
553        // List is present, in Selective mode.
554        assert_eq!(
555            sliding_sync
556                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
557                    list.sync_mode(),
558                    SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
559                )))
560                .await,
561            Some(true)
562        );
563
564        Ok(())
565    }
566
567    #[async_test]
568    async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
569        let (client, server) = new_client().await;
570
571        let room_list = RoomListService::new(client).await?;
572
573        let sync = room_list.sync();
574        pin_mut!(sync);
575
576        // Run a first sync.
577        {
578            let _mock_guard = Mock::given(SlidingSyncMatcher)
579                .respond_with(move |_request: &Request| {
580                    ResponseTemplate::new(200).set_body_json(json!({
581                        "pos": "0",
582                        "lists": {
583                            ALL_ROOMS_LIST_NAME: {
584                                "count": 0,
585                                "ops": [],
586                            },
587                        },
588                        "rooms": {},
589                    }))
590                })
591                .mount_as_scoped(&server)
592                .await;
593
594            let _ = sync.next().await;
595        }
596
597        assert_eq!(room_list.state().get(), State::SettingUp);
598
599        // Stop the sync.
600        room_list.stop_sync()?;
601
602        // Do another sync.
603        let _ = sync.next().await;
604
605        // State is `Terminated`, as expected!
606        assert_eq!(
607            room_list.state_machine.get(),
608            State::Terminated { from: Box::new(State::Running) }
609        );
610
611        // Now, let's make the sliding sync session to expire.
612        room_list.expire_sync_session().await;
613
614        // State is `Error`, as a regular session expiration would generate!
615        assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
616
617        Ok(())
618    }
619}