matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeMap;
2
3use as_variant::as_variant;
4use imbl::Vector;
5use matrix_sdk_base::{sliding_sync::http, sync::SyncResponse, PreviousEventsProvider};
6use ruma::{
7    api::{
8        client::discovery::{discover_homeserver, get_supported_versions},
9        MatrixVersion,
10    },
11    events::AnyToDeviceEvent,
12    serde::Raw,
13    OwnedRoomId,
14};
15use tracing::error;
16use url::Url;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{config::RequestConfig, http_client::HttpClient, Client, Result, SlidingSyncRoom};
20
21/// A sliding sync version.
22#[derive(Clone, Debug)]
23pub enum Version {
24    /// No version. Useful to represent that sliding sync is disabled for
25    /// example, and that the version is unknown.
26    None,
27
28    /// Use the version of the sliding sync proxy, i.e. MSC3575.
29    Proxy {
30        /// URL to the proxy.
31        url: Url,
32    },
33
34    /// Use the version of the sliding sync implementation inside Synapse, i.e.
35    /// MSC4186.
36    Native,
37}
38
39impl Version {
40    pub(crate) fn is_native(&self) -> bool {
41        matches!(self, Self::Native)
42    }
43
44    pub(crate) fn overriding_url(&self) -> Option<&Url> {
45        as_variant!(self, Self::Proxy { url } => url)
46    }
47}
48
49/// An error when building a version.
50#[derive(thiserror::Error, Debug)]
51pub enum VersionBuilderError {
52    /// The `.well-known` response is not set.
53    #[error("`.well-known` is not set")]
54    WellKnownNotSet,
55
56    /// `.well-known` does not contain a `sliding_sync_proxy` entry.
57    #[error("`.well-known` does not contain a `sliding_sync_proxy` entry")]
58    NoSlidingSyncInWellKnown,
59
60    /// The `sliding_sync_proxy` URL in .well-known` is not valid ({0}).
61    #[error("the `sliding_sync_proxy` URL in .well-known` is not valid ({0})")]
62    UnparsableSlidingSyncUrl(url::ParseError),
63
64    /// The `/versions` response is not set.
65    #[error("The `/versions` response is not set")]
66    MissingVersionsResponse,
67
68    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
69    /// `unstable_features`, or it's not set to true.
70    #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
71    NativeVersionIsUnset,
72}
73
74/// A builder for [`Version`].
75#[derive(Clone, Debug)]
76pub enum VersionBuilder {
77    /// Build a [`Version::None`].
78    None,
79
80    /// Build a [`Version::Proxy`].
81    Proxy {
82        /// Coerced URL to the proxy.
83        url: Url,
84    },
85
86    /// Build a [`Version::Native`].
87    Native,
88
89    /// Build a [`Version::Proxy`] by auto-discovering it.
90    ///
91    /// It is available if the server enables it via `.well-known`.
92    DiscoverProxy,
93
94    /// Build a [`Version::Native`] by auto-discovering it.
95    ///
96    /// It is available if the server enables it via `/versions`.
97    DiscoverNative,
98}
99
100impl VersionBuilder {
101    pub(crate) fn needs_get_supported_versions(&self) -> bool {
102        matches!(self, Self::DiscoverNative)
103    }
104
105    /// Build a [`Version`].
106    ///
107    /// It can fail if auto-discovering fails, e.g. if `.well-known`
108    /// or `/versions` do contain invalid data.
109    pub fn build(
110        self,
111        well_known: Option<&discover_homeserver::Response>,
112        versions: Option<&get_supported_versions::Response>,
113    ) -> Result<Version, VersionBuilderError> {
114        Ok(match self {
115            Self::None => Version::None,
116
117            Self::Proxy { url } => Version::Proxy { url },
118
119            Self::Native => Version::Native,
120
121            Self::DiscoverProxy => {
122                let Some(well_known) = well_known else {
123                    return Err(VersionBuilderError::WellKnownNotSet);
124                };
125
126                let Some(info) = &well_known.sliding_sync_proxy else {
127                    return Err(VersionBuilderError::NoSlidingSyncInWellKnown);
128                };
129
130                let url =
131                    Url::parse(&info.url).map_err(VersionBuilderError::UnparsableSlidingSyncUrl)?;
132
133                Version::Proxy { url }
134            }
135
136            Self::DiscoverNative => {
137                let Some(versions) = versions else {
138                    return Err(VersionBuilderError::MissingVersionsResponse);
139                };
140
141                match versions.unstable_features.get("org.matrix.simplified_msc3575") {
142                    Some(value) if *value => Version::Native,
143                    _ => return Err(VersionBuilderError::NativeVersionIsUnset),
144                }
145            }
146        })
147    }
148}
149
150impl Client {
151    /// Find all sliding sync versions that are available.
152    ///
153    /// Be careful: This method may hit the store and will send new requests for
154    /// each call. It can be costly to call it repeatedly.
155    ///
156    /// If `.well-known` or `/versions` is unreachable, it will simply move
157    /// potential sliding sync versions aside. No error will be reported.
158    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
159        async fn discover_homeserver(
160            http_client: &HttpClient,
161            server: Option<String>,
162        ) -> Option<discover_homeserver::Response> {
163            if let Some(server) = server {
164                http_client
165                    .send(
166                        discover_homeserver::Request::new(),
167                        Some(RequestConfig::short_retry()),
168                        server,
169                        None,
170                        &[MatrixVersion::V1_0],
171                        Default::default(),
172                    )
173                    .await
174                    .ok()
175            } else {
176                None
177            }
178        }
179
180        let http_client = &self.inner.http_client;
181
182        // Discover the homeserver by using:
183        //
184        // * the server if any,
185        // * by using the user ID's server name (if any) with `https://`,
186        // * by using the user ID's server name (if any) with `http://`.
187        //
188        // Otherwise, `well_known` is `None`.
189        let well_known = if let Some(well_known) =
190            discover_homeserver(http_client, self.server().map(ToString::to_string)).await
191        {
192            Some(well_known)
193        } else if let Some(well_known) = discover_homeserver(
194            http_client,
195            self.user_id().map(|user_id| format!("https://{}", user_id.server_name())),
196        )
197        .await
198        {
199            Some(well_known)
200        } else {
201            discover_homeserver(
202                http_client,
203                self.user_id().map(|user_id| format!("http://{}", user_id.server_name())),
204            )
205            .await
206        };
207
208        let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
209            let mut response = get_supported_versions::Response::new(vec![]);
210            response.unstable_features = unstable_features;
211
212            response
213        });
214
215        [VersionBuilder::DiscoverNative, VersionBuilder::DiscoverProxy]
216            .into_iter()
217            .filter_map(|version_builder| {
218                version_builder.build(well_known.as_ref(), supported_versions.as_ref()).ok()
219            })
220            .collect()
221    }
222
223    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
224    /// identifier.
225    ///
226    /// Note: the identifier must not be more than 16 chars long!
227    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
228        Ok(SlidingSync::builder(id.into(), self.clone())?)
229    }
230
231    /// Handle all the information provided in a sliding sync response, except
232    /// for the e2ee bits.
233    ///
234    /// If you need to handle encryption too, use the internal
235    /// `SlidingSyncResponseProcessor` instead.
236    #[cfg(any(test, feature = "testing"))]
237    #[tracing::instrument(skip(self, response))]
238    pub async fn process_sliding_sync_test_helper(
239        &self,
240        response: &http::Response,
241    ) -> Result<SyncResponse> {
242        let response = self
243            .base_client()
244            .process_sliding_sync(response, &(), self.sliding_sync_version().is_native())
245            .await?;
246
247        tracing::debug!("done processing on base_client");
248        self.call_sync_response_handlers(&response).await?;
249
250        Ok(response)
251    }
252}
253
254struct SlidingSyncPreviousEventsProvider<'a>(&'a BTreeMap<OwnedRoomId, SlidingSyncRoom>);
255
256impl PreviousEventsProvider for SlidingSyncPreviousEventsProvider<'_> {
257    fn for_room(
258        &self,
259        room_id: &ruma::RoomId,
260    ) -> Vector<matrix_sdk_common::deserialized_responses::TimelineEvent> {
261        self.0.get(room_id).map(|room| room.timeline_queue()).unwrap_or_default()
262    }
263}
264
265/// Small helper to handle a `SlidingSync` response's sub parts.
266///
267/// This will properly handle the encryption and the room response
268/// independently, if needs be, making sure that both are properly processed by
269/// event handlers.
270#[must_use]
271pub(crate) struct SlidingSyncResponseProcessor<'a> {
272    client: Client,
273    to_device_events: Vec<Raw<AnyToDeviceEvent>>,
274    response: Option<SyncResponse>,
275    rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
276}
277
278impl<'a> SlidingSyncResponseProcessor<'a> {
279    pub fn new(client: Client, rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
280        Self { client, to_device_events: Vec::new(), response: None, rooms }
281    }
282
283    #[cfg(feature = "e2e-encryption")]
284    pub async fn handle_encryption(
285        &mut self,
286        extensions: &http::response::Extensions,
287    ) -> Result<()> {
288        // This is an internal API misuse if this is triggered (calling
289        // `handle_room_response` before this function), so panic is fine.
290        assert!(self.response.is_none());
291
292        self.to_device_events = if let Some(to_device_events) = self
293            .client
294            .base_client()
295            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
296            .await?
297        {
298            // Some new keys might have been received, so trigger a backup if needed.
299            self.client.encryption().backups().maybe_trigger_backup();
300
301            to_device_events
302        } else {
303            Vec::new()
304        };
305
306        Ok(())
307    }
308
309    pub async fn handle_room_response(
310        &mut self,
311        response: &http::Response,
312        with_msc4186: bool,
313    ) -> Result<()> {
314        self.response = Some(
315            self.client
316                .base_client()
317                .process_sliding_sync(
318                    response,
319                    &SlidingSyncPreviousEventsProvider(self.rooms),
320                    with_msc4186,
321                )
322                .await?,
323        );
324        self.post_process().await
325    }
326
327    async fn post_process(&mut self) -> Result<()> {
328        // This is an internal API misuse if this is triggered (calling
329        // `handle_room_response` after this function), so panic is fine.
330        let response = self.response.as_ref().unwrap();
331
332        update_in_memory_caches(&self.client, response).await?;
333
334        Ok(())
335    }
336
337    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
338        let mut response = self.response.take().unwrap_or_default();
339
340        response.to_device.extend(self.to_device_events);
341
342        self.client.call_sync_response_handlers(&response).await?;
343
344        Ok(response)
345    }
346}
347
348/// Update the caches for the rooms that received updates.
349///
350/// This will only fill the in-memory caches, not save the info on disk.
351async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
352    for room_id in response.rooms.join.keys() {
353        let Some(room) = client.get_room(room_id) else {
354            error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing");
355            continue;
356        };
357
358        room.user_defined_notification_mode().await;
359    }
360
361    Ok(())
362}
363
364#[cfg(all(test, not(target_family = "wasm")))]
365mod tests {
366    use std::collections::BTreeMap;
367
368    use assert_matches::assert_matches;
369    use matrix_sdk_base::{notification_settings::RoomNotificationMode, SessionMeta};
370    use matrix_sdk_test::async_test;
371    use ruma::{
372        api::MatrixVersion, assign, owned_device_id, room_id, serde::Raw, OwnedUserId, ServerName,
373    };
374    use serde_json::json;
375    use url::Url;
376    use wiremock::{
377        matchers::{method, path},
378        Mock, MockServer, ResponseTemplate,
379    };
380
381    use super::{discover_homeserver, get_supported_versions, Version, VersionBuilder};
382    use crate::{
383        authentication::matrix::{MatrixSession, MatrixSessionTokens},
384        error::Result,
385        sliding_sync::{http, VersionBuilderError},
386        test_utils::logged_in_client_with_server,
387        Client, SlidingSyncList, SlidingSyncMode,
388    };
389
390    #[test]
391    fn test_version_builder_none() {
392        assert_matches!(VersionBuilder::None.build(None, None), Ok(Version::None));
393    }
394
395    #[test]
396    fn test_version_builder_proxy() {
397        let expected_url = Url::parse("https://matrix.org:1234").unwrap();
398
399        assert_matches!(
400            VersionBuilder::Proxy { url: expected_url.clone() }.build(None, None),
401            Ok(Version::Proxy { url }) => {
402                assert_eq!(url, expected_url);
403            }
404        );
405    }
406
407    #[test]
408    fn test_version_builder_native() {
409        assert_matches!(VersionBuilder::Native.build(None, None), Ok(Version::Native));
410    }
411
412    #[test]
413    fn test_version_builder_discover_proxy() {
414        let expected_url = Url::parse("https://matrix.org:1234").unwrap();
415        let mut response = discover_homeserver::Response::new(
416            discover_homeserver::HomeserverInfo::new("matrix.org".to_owned()),
417        );
418        response.sliding_sync_proxy =
419            Some(discover_homeserver::SlidingSyncProxyInfo::new(expected_url.to_string()));
420
421        assert_matches!(
422            VersionBuilder::DiscoverProxy.build(Some(&response), None),
423            Ok(Version::Proxy { url }) => {
424                assert_eq!(url, expected_url);
425            }
426        );
427    }
428
429    #[test]
430    fn test_version_builder_discover_proxy_no_well_known() {
431        assert_matches!(
432            VersionBuilder::DiscoverProxy.build(None, None),
433            Err(VersionBuilderError::WellKnownNotSet)
434        );
435    }
436
437    #[test]
438    fn test_version_builder_discover_proxy_no_sliding_sync_proxy_in_well_known() {
439        let mut response = discover_homeserver::Response::new(
440            discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
441        );
442        response.sliding_sync_proxy = None; // already `None` but the test is clearer now.
443
444        assert_matches!(
445            VersionBuilder::DiscoverProxy.build(Some(&response), None),
446            Err(VersionBuilderError::NoSlidingSyncInWellKnown)
447        );
448    }
449
450    #[test]
451    fn test_version_builder_discover_proxy_invalid_sliding_sync_proxy_in_well_known() {
452        let mut response = discover_homeserver::Response::new(
453            discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
454        );
455        response.sliding_sync_proxy =
456            Some(discover_homeserver::SlidingSyncProxyInfo::new("💥".to_owned()));
457
458        assert_matches!(
459            VersionBuilder::DiscoverProxy.build(Some(&response), None),
460            Err(VersionBuilderError::UnparsableSlidingSyncUrl(err)) => {
461                assert_eq!(err.to_string(), "relative URL without a base");
462            }
463        );
464    }
465
466    #[test]
467    fn test_version_builder_discover_native() {
468        let mut response = get_supported_versions::Response::new(vec![]);
469        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
470
471        assert_matches!(
472            VersionBuilder::DiscoverNative.build(None, Some(&response)),
473            Ok(Version::Native)
474        );
475    }
476
477    #[test]
478    fn test_version_builder_discover_native_no_supported_versions() {
479        assert_matches!(
480            VersionBuilder::DiscoverNative.build(None, None),
481            Err(VersionBuilderError::MissingVersionsResponse)
482        );
483    }
484
485    #[test]
486    fn test_version_builder_discover_native_unstable_features_is_disabled() {
487        let mut response = get_supported_versions::Response::new(vec![]);
488        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
489
490        assert_matches!(
491            VersionBuilder::DiscoverNative.build(None, Some(&response)),
492            Err(VersionBuilderError::NativeVersionIsUnset)
493        );
494    }
495
496    #[async_test]
497    async fn test_available_sliding_sync_versions_none() {
498        let (client, _server) = logged_in_client_with_server().await;
499        let available_versions = client.available_sliding_sync_versions().await;
500
501        // `.well-known` and `/versions` aren't available. It's impossible to find any
502        // versions.
503        assert!(available_versions.is_empty());
504    }
505
506    #[async_test]
507    async fn test_available_sliding_sync_versions_proxy_with_server() {
508        let server = MockServer::start().await;
509        let homeserver = format!("https://{}/homeserver", server.address());
510        let proxy = format!("https://{}/sliding-sync-proxy", server.address());
511
512        Mock::given(method("GET"))
513            .and(path("/.well-known/matrix/client"))
514            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
515                "m.homeserver": {
516                    "base_url": homeserver,
517                },
518                "org.matrix.msc3575.proxy": {
519                    "url": proxy,
520                },
521            })))
522            .mount(&server)
523            .await;
524
525        // The server knows the server.
526        let client = Client::builder()
527            .insecure_server_name_no_tls(
528                <&ServerName>::try_from(server.address().to_string().as_str()).unwrap(),
529            )
530            .server_versions([MatrixVersion::V1_0])
531            .build()
532            .await
533            .unwrap();
534
535        let available_versions = client.available_sliding_sync_versions().await;
536
537        // `.well-known` is available.
538        assert_eq!(available_versions.len(), 1);
539        assert_matches!(
540            &available_versions[0],
541            Version::Proxy { url } => {
542                assert_eq!(url, &Url::parse(&proxy).unwrap());
543            }
544        );
545    }
546
547    #[async_test]
548    async fn test_available_sliding_sync_versions_proxy_with_user_id() {
549        let server = MockServer::start().await;
550        let homeserver = format!("https://{}/homeserver", server.address());
551        let proxy = format!("https://{}/sliding-sync-proxy", server.address());
552
553        Mock::given(method("GET"))
554            .and(path("/.well-known/matrix/client"))
555            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
556                "m.homeserver": {
557                    "base_url": homeserver,
558                },
559                "org.matrix.msc3575.proxy": {
560                    "url": proxy,
561                },
562            })))
563            .mount(&server)
564            .await;
565
566        // The client doesn't know the server.
567        let client = Client::builder()
568            .homeserver_url(homeserver)
569            .server_versions([MatrixVersion::V1_0])
570            .build()
571            .await
572            .unwrap();
573
574        // The client knows a user.
575        client
576            .matrix_auth()
577            .restore_session(MatrixSession {
578                meta: SessionMeta {
579                    user_id: OwnedUserId::try_from(format!("@alice:{}", server.address())).unwrap(),
580                    device_id: owned_device_id!("DEVICEID"),
581                },
582                tokens: MatrixSessionTokens {
583                    access_token: "1234".to_owned(),
584                    refresh_token: None,
585                },
586            })
587            .await
588            .unwrap();
589
590        let available_versions = client.available_sliding_sync_versions().await;
591
592        // `.well-known` is available.
593        assert_eq!(available_versions.len(), 1);
594        assert_matches!(
595            &available_versions[0],
596            Version::Proxy { url } => {
597                assert_eq!(url, &Url::parse(&proxy).unwrap());
598            }
599        );
600    }
601
602    #[async_test]
603    async fn test_available_sliding_sync_versions_native() {
604        let (client, server) = logged_in_client_with_server().await;
605
606        Mock::given(method("GET"))
607            .and(path("/_matrix/client/versions"))
608            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
609                "versions": [],
610                "unstable_features": {
611                    "org.matrix.simplified_msc3575": true,
612                },
613            })))
614            .mount(&server)
615            .await;
616
617        let available_versions = client.available_sliding_sync_versions().await;
618
619        // `/versions` is available.
620        assert_eq!(available_versions.len(), 1);
621        assert_matches!(available_versions[0], Version::Native);
622    }
623
624    #[async_test]
625    async fn test_cache_user_defined_notification_mode() -> Result<()> {
626        let (client, _server) = logged_in_client_with_server().await;
627        let room_id = room_id!("!r0:matrix.org");
628
629        let sliding_sync = client
630            .sliding_sync("test")?
631            .with_account_data_extension(
632                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
633            )
634            .add_list(
635                SlidingSyncList::builder("all")
636                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
637            )
638            .build()
639            .await?;
640
641        // Mock a sync response.
642        // A `m.push_rules` with `room` is cached during the sync.
643        {
644            let server_response = assign!(http::Response::new("0".to_owned()), {
645                rooms: BTreeMap::from([(
646                    room_id.to_owned(),
647                    http::response::Room::default(),
648                )]),
649                extensions: assign!(http::response::Extensions::default(), {
650                    account_data: assign!(http::response::AccountData::default(), {
651                        global: vec![
652                            Raw::from_json_string(
653                                json!({
654                                    "type": "m.push_rules",
655                                    "content": {
656                                        "global": {
657                                            "room": [
658                                                {
659                                                    "actions": ["notify"],
660                                                    "rule_id": room_id,
661                                                    "default": false,
662                                                    "enabled": true,
663                                                },
664                                            ],
665                                        },
666                                    },
667                                })
668                                .to_string(),
669                            ).unwrap()
670                        ]
671                    })
672                })
673            });
674
675            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
676            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
677        }
678
679        // The room must exist, since it's been synced.
680        let room = client.get_room(room_id).unwrap();
681
682        // The room has a cached user-defined notification mode.
683        assert_eq!(
684            room.cached_user_defined_notification_mode(),
685            Some(RoomNotificationMode::AllMessages),
686        );
687
688        // Mock a sync response.
689        // A `m.push_rules` with `room` is cached during the sync.
690        // It overwrites the previous cache.
691        {
692            let server_response = assign!(http::Response::new("0".to_owned()), {
693                rooms: BTreeMap::from([(
694                    room_id.to_owned(),
695                    http::response::Room::default(),
696                )]),
697                extensions: assign!(http::response::Extensions::default(), {
698                    account_data: assign!(http::response::AccountData::default(), {
699                        global: vec![
700                            Raw::from_json_string(
701                                json!({
702                                    "type": "m.push_rules",
703                                    "content": {
704                                        "global": {
705                                            "room": [
706                                                {
707                                                    "actions": [],
708                                                    "rule_id": room_id,
709                                                    "default": false,
710                                                    "enabled": true,
711                                                },
712                                            ],
713                                        },
714                                    },
715                                })
716                                .to_string(),
717                            ).unwrap()
718                        ]
719                    })
720                })
721            });
722
723            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
724            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
725        }
726
727        // The room has an updated cached user-defined notification mode.
728        assert_eq!(
729            room.cached_user_defined_notification_mode(),
730            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
731        );
732
733        Ok(())
734    }
735}