1use std::collections::BTreeMap;
2
3use as_variant::as_variant;
4use imbl::Vector;
5use matrix_sdk_base::{sliding_sync::http, sync::SyncResponse, PreviousEventsProvider};
6use ruma::{
7 api::{
8 client::discovery::{discover_homeserver, get_supported_versions},
9 MatrixVersion,
10 },
11 events::AnyToDeviceEvent,
12 serde::Raw,
13 OwnedRoomId,
14};
15use tracing::error;
16use url::Url;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{config::RequestConfig, http_client::HttpClient, Client, Result, SlidingSyncRoom};
20
21#[derive(Clone, Debug)]
23pub enum Version {
24 None,
27
28 Proxy {
30 url: Url,
32 },
33
34 Native,
37}
38
39impl Version {
40 pub(crate) fn is_native(&self) -> bool {
41 matches!(self, Self::Native)
42 }
43
44 pub(crate) fn overriding_url(&self) -> Option<&Url> {
45 as_variant!(self, Self::Proxy { url } => url)
46 }
47}
48
49#[derive(thiserror::Error, Debug)]
51pub enum VersionBuilderError {
52 #[error("`.well-known` is not set")]
54 WellKnownNotSet,
55
56 #[error("`.well-known` does not contain a `sliding_sync_proxy` entry")]
58 NoSlidingSyncInWellKnown,
59
60 #[error("the `sliding_sync_proxy` URL in .well-known` is not valid ({0})")]
62 UnparsableSlidingSyncUrl(url::ParseError),
63
64 #[error("The `/versions` response is not set")]
66 MissingVersionsResponse,
67
68 #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
71 NativeVersionIsUnset,
72}
73
74#[derive(Clone, Debug)]
76pub enum VersionBuilder {
77 None,
79
80 Proxy {
82 url: Url,
84 },
85
86 Native,
88
89 DiscoverProxy,
93
94 DiscoverNative,
98}
99
100impl VersionBuilder {
101 pub(crate) fn needs_get_supported_versions(&self) -> bool {
102 matches!(self, Self::DiscoverNative)
103 }
104
105 pub fn build(
110 self,
111 well_known: Option<&discover_homeserver::Response>,
112 versions: Option<&get_supported_versions::Response>,
113 ) -> Result<Version, VersionBuilderError> {
114 Ok(match self {
115 Self::None => Version::None,
116
117 Self::Proxy { url } => Version::Proxy { url },
118
119 Self::Native => Version::Native,
120
121 Self::DiscoverProxy => {
122 let Some(well_known) = well_known else {
123 return Err(VersionBuilderError::WellKnownNotSet);
124 };
125
126 let Some(info) = &well_known.sliding_sync_proxy else {
127 return Err(VersionBuilderError::NoSlidingSyncInWellKnown);
128 };
129
130 let url =
131 Url::parse(&info.url).map_err(VersionBuilderError::UnparsableSlidingSyncUrl)?;
132
133 Version::Proxy { url }
134 }
135
136 Self::DiscoverNative => {
137 let Some(versions) = versions else {
138 return Err(VersionBuilderError::MissingVersionsResponse);
139 };
140
141 match versions.unstable_features.get("org.matrix.simplified_msc3575") {
142 Some(value) if *value => Version::Native,
143 _ => return Err(VersionBuilderError::NativeVersionIsUnset),
144 }
145 }
146 })
147 }
148}
149
150impl Client {
151 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
159 async fn discover_homeserver(
160 http_client: &HttpClient,
161 server: Option<String>,
162 ) -> Option<discover_homeserver::Response> {
163 if let Some(server) = server {
164 http_client
165 .send(
166 discover_homeserver::Request::new(),
167 Some(RequestConfig::short_retry()),
168 server,
169 None,
170 &[MatrixVersion::V1_0],
171 Default::default(),
172 )
173 .await
174 .ok()
175 } else {
176 None
177 }
178 }
179
180 let http_client = &self.inner.http_client;
181
182 let well_known = if let Some(well_known) =
190 discover_homeserver(http_client, self.server().map(ToString::to_string)).await
191 {
192 Some(well_known)
193 } else if let Some(well_known) = discover_homeserver(
194 http_client,
195 self.user_id().map(|user_id| format!("https://{}", user_id.server_name())),
196 )
197 .await
198 {
199 Some(well_known)
200 } else {
201 discover_homeserver(
202 http_client,
203 self.user_id().map(|user_id| format!("http://{}", user_id.server_name())),
204 )
205 .await
206 };
207
208 let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
209 let mut response = get_supported_versions::Response::new(vec![]);
210 response.unstable_features = unstable_features;
211
212 response
213 });
214
215 [VersionBuilder::DiscoverNative, VersionBuilder::DiscoverProxy]
216 .into_iter()
217 .filter_map(|version_builder| {
218 version_builder.build(well_known.as_ref(), supported_versions.as_ref()).ok()
219 })
220 .collect()
221 }
222
223 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
228 Ok(SlidingSync::builder(id.into(), self.clone())?)
229 }
230
231 #[cfg(any(test, feature = "testing"))]
237 #[tracing::instrument(skip(self, response))]
238 pub async fn process_sliding_sync_test_helper(
239 &self,
240 response: &http::Response,
241 ) -> Result<SyncResponse> {
242 let response = self
243 .base_client()
244 .process_sliding_sync(response, &(), self.sliding_sync_version().is_native())
245 .await?;
246
247 tracing::debug!("done processing on base_client");
248 self.call_sync_response_handlers(&response).await?;
249
250 Ok(response)
251 }
252}
253
254struct SlidingSyncPreviousEventsProvider<'a>(&'a BTreeMap<OwnedRoomId, SlidingSyncRoom>);
255
256impl PreviousEventsProvider for SlidingSyncPreviousEventsProvider<'_> {
257 fn for_room(
258 &self,
259 room_id: &ruma::RoomId,
260 ) -> Vector<matrix_sdk_common::deserialized_responses::TimelineEvent> {
261 self.0.get(room_id).map(|room| room.timeline_queue()).unwrap_or_default()
262 }
263}
264
265#[must_use]
271pub(crate) struct SlidingSyncResponseProcessor<'a> {
272 client: Client,
273 to_device_events: Vec<Raw<AnyToDeviceEvent>>,
274 response: Option<SyncResponse>,
275 rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
276}
277
278impl<'a> SlidingSyncResponseProcessor<'a> {
279 pub fn new(client: Client, rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
280 Self { client, to_device_events: Vec::new(), response: None, rooms }
281 }
282
283 #[cfg(feature = "e2e-encryption")]
284 pub async fn handle_encryption(
285 &mut self,
286 extensions: &http::response::Extensions,
287 ) -> Result<()> {
288 assert!(self.response.is_none());
291
292 self.to_device_events = if let Some(to_device_events) = self
293 .client
294 .base_client()
295 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
296 .await?
297 {
298 self.client.encryption().backups().maybe_trigger_backup();
300
301 to_device_events
302 } else {
303 Vec::new()
304 };
305
306 Ok(())
307 }
308
309 pub async fn handle_room_response(
310 &mut self,
311 response: &http::Response,
312 with_msc4186: bool,
313 ) -> Result<()> {
314 self.response = Some(
315 self.client
316 .base_client()
317 .process_sliding_sync(
318 response,
319 &SlidingSyncPreviousEventsProvider(self.rooms),
320 with_msc4186,
321 )
322 .await?,
323 );
324 self.post_process().await
325 }
326
327 async fn post_process(&mut self) -> Result<()> {
328 let response = self.response.as_ref().unwrap();
331
332 update_in_memory_caches(&self.client, response).await?;
333
334 Ok(())
335 }
336
337 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
338 let mut response = self.response.take().unwrap_or_default();
339
340 response.to_device.extend(self.to_device_events);
341
342 self.client.call_sync_response_handlers(&response).await?;
343
344 Ok(response)
345 }
346}
347
348async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
352 for room_id in response.rooms.join.keys() {
353 let Some(room) = client.get_room(room_id) else {
354 error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing");
355 continue;
356 };
357
358 room.user_defined_notification_mode().await;
359 }
360
361 Ok(())
362}
363
364#[cfg(all(test, not(target_family = "wasm")))]
365mod tests {
366 use std::collections::BTreeMap;
367
368 use assert_matches::assert_matches;
369 use matrix_sdk_base::{notification_settings::RoomNotificationMode, SessionMeta};
370 use matrix_sdk_test::async_test;
371 use ruma::{
372 api::MatrixVersion, assign, owned_device_id, room_id, serde::Raw, OwnedUserId, ServerName,
373 };
374 use serde_json::json;
375 use url::Url;
376 use wiremock::{
377 matchers::{method, path},
378 Mock, MockServer, ResponseTemplate,
379 };
380
381 use super::{discover_homeserver, get_supported_versions, Version, VersionBuilder};
382 use crate::{
383 authentication::matrix::{MatrixSession, MatrixSessionTokens},
384 error::Result,
385 sliding_sync::{http, VersionBuilderError},
386 test_utils::logged_in_client_with_server,
387 Client, SlidingSyncList, SlidingSyncMode,
388 };
389
390 #[test]
391 fn test_version_builder_none() {
392 assert_matches!(VersionBuilder::None.build(None, None), Ok(Version::None));
393 }
394
395 #[test]
396 fn test_version_builder_proxy() {
397 let expected_url = Url::parse("https://matrix.org:1234").unwrap();
398
399 assert_matches!(
400 VersionBuilder::Proxy { url: expected_url.clone() }.build(None, None),
401 Ok(Version::Proxy { url }) => {
402 assert_eq!(url, expected_url);
403 }
404 );
405 }
406
407 #[test]
408 fn test_version_builder_native() {
409 assert_matches!(VersionBuilder::Native.build(None, None), Ok(Version::Native));
410 }
411
412 #[test]
413 fn test_version_builder_discover_proxy() {
414 let expected_url = Url::parse("https://matrix.org:1234").unwrap();
415 let mut response = discover_homeserver::Response::new(
416 discover_homeserver::HomeserverInfo::new("matrix.org".to_owned()),
417 );
418 response.sliding_sync_proxy =
419 Some(discover_homeserver::SlidingSyncProxyInfo::new(expected_url.to_string()));
420
421 assert_matches!(
422 VersionBuilder::DiscoverProxy.build(Some(&response), None),
423 Ok(Version::Proxy { url }) => {
424 assert_eq!(url, expected_url);
425 }
426 );
427 }
428
429 #[test]
430 fn test_version_builder_discover_proxy_no_well_known() {
431 assert_matches!(
432 VersionBuilder::DiscoverProxy.build(None, None),
433 Err(VersionBuilderError::WellKnownNotSet)
434 );
435 }
436
437 #[test]
438 fn test_version_builder_discover_proxy_no_sliding_sync_proxy_in_well_known() {
439 let mut response = discover_homeserver::Response::new(
440 discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
441 );
442 response.sliding_sync_proxy = None; assert_matches!(
445 VersionBuilder::DiscoverProxy.build(Some(&response), None),
446 Err(VersionBuilderError::NoSlidingSyncInWellKnown)
447 );
448 }
449
450 #[test]
451 fn test_version_builder_discover_proxy_invalid_sliding_sync_proxy_in_well_known() {
452 let mut response = discover_homeserver::Response::new(
453 discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
454 );
455 response.sliding_sync_proxy =
456 Some(discover_homeserver::SlidingSyncProxyInfo::new("💥".to_owned()));
457
458 assert_matches!(
459 VersionBuilder::DiscoverProxy.build(Some(&response), None),
460 Err(VersionBuilderError::UnparsableSlidingSyncUrl(err)) => {
461 assert_eq!(err.to_string(), "relative URL without a base");
462 }
463 );
464 }
465
466 #[test]
467 fn test_version_builder_discover_native() {
468 let mut response = get_supported_versions::Response::new(vec![]);
469 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
470
471 assert_matches!(
472 VersionBuilder::DiscoverNative.build(None, Some(&response)),
473 Ok(Version::Native)
474 );
475 }
476
477 #[test]
478 fn test_version_builder_discover_native_no_supported_versions() {
479 assert_matches!(
480 VersionBuilder::DiscoverNative.build(None, None),
481 Err(VersionBuilderError::MissingVersionsResponse)
482 );
483 }
484
485 #[test]
486 fn test_version_builder_discover_native_unstable_features_is_disabled() {
487 let mut response = get_supported_versions::Response::new(vec![]);
488 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
489
490 assert_matches!(
491 VersionBuilder::DiscoverNative.build(None, Some(&response)),
492 Err(VersionBuilderError::NativeVersionIsUnset)
493 );
494 }
495
496 #[async_test]
497 async fn test_available_sliding_sync_versions_none() {
498 let (client, _server) = logged_in_client_with_server().await;
499 let available_versions = client.available_sliding_sync_versions().await;
500
501 assert!(available_versions.is_empty());
504 }
505
506 #[async_test]
507 async fn test_available_sliding_sync_versions_proxy_with_server() {
508 let server = MockServer::start().await;
509 let homeserver = format!("https://{}/homeserver", server.address());
510 let proxy = format!("https://{}/sliding-sync-proxy", server.address());
511
512 Mock::given(method("GET"))
513 .and(path("/.well-known/matrix/client"))
514 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
515 "m.homeserver": {
516 "base_url": homeserver,
517 },
518 "org.matrix.msc3575.proxy": {
519 "url": proxy,
520 },
521 })))
522 .mount(&server)
523 .await;
524
525 let client = Client::builder()
527 .insecure_server_name_no_tls(
528 <&ServerName>::try_from(server.address().to_string().as_str()).unwrap(),
529 )
530 .server_versions([MatrixVersion::V1_0])
531 .build()
532 .await
533 .unwrap();
534
535 let available_versions = client.available_sliding_sync_versions().await;
536
537 assert_eq!(available_versions.len(), 1);
539 assert_matches!(
540 &available_versions[0],
541 Version::Proxy { url } => {
542 assert_eq!(url, &Url::parse(&proxy).unwrap());
543 }
544 );
545 }
546
547 #[async_test]
548 async fn test_available_sliding_sync_versions_proxy_with_user_id() {
549 let server = MockServer::start().await;
550 let homeserver = format!("https://{}/homeserver", server.address());
551 let proxy = format!("https://{}/sliding-sync-proxy", server.address());
552
553 Mock::given(method("GET"))
554 .and(path("/.well-known/matrix/client"))
555 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
556 "m.homeserver": {
557 "base_url": homeserver,
558 },
559 "org.matrix.msc3575.proxy": {
560 "url": proxy,
561 },
562 })))
563 .mount(&server)
564 .await;
565
566 let client = Client::builder()
568 .homeserver_url(homeserver)
569 .server_versions([MatrixVersion::V1_0])
570 .build()
571 .await
572 .unwrap();
573
574 client
576 .matrix_auth()
577 .restore_session(MatrixSession {
578 meta: SessionMeta {
579 user_id: OwnedUserId::try_from(format!("@alice:{}", server.address())).unwrap(),
580 device_id: owned_device_id!("DEVICEID"),
581 },
582 tokens: MatrixSessionTokens {
583 access_token: "1234".to_owned(),
584 refresh_token: None,
585 },
586 })
587 .await
588 .unwrap();
589
590 let available_versions = client.available_sliding_sync_versions().await;
591
592 assert_eq!(available_versions.len(), 1);
594 assert_matches!(
595 &available_versions[0],
596 Version::Proxy { url } => {
597 assert_eq!(url, &Url::parse(&proxy).unwrap());
598 }
599 );
600 }
601
602 #[async_test]
603 async fn test_available_sliding_sync_versions_native() {
604 let (client, server) = logged_in_client_with_server().await;
605
606 Mock::given(method("GET"))
607 .and(path("/_matrix/client/versions"))
608 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
609 "versions": [],
610 "unstable_features": {
611 "org.matrix.simplified_msc3575": true,
612 },
613 })))
614 .mount(&server)
615 .await;
616
617 let available_versions = client.available_sliding_sync_versions().await;
618
619 assert_eq!(available_versions.len(), 1);
621 assert_matches!(available_versions[0], Version::Native);
622 }
623
624 #[async_test]
625 async fn test_cache_user_defined_notification_mode() -> Result<()> {
626 let (client, _server) = logged_in_client_with_server().await;
627 let room_id = room_id!("!r0:matrix.org");
628
629 let sliding_sync = client
630 .sliding_sync("test")?
631 .with_account_data_extension(
632 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
633 )
634 .add_list(
635 SlidingSyncList::builder("all")
636 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
637 )
638 .build()
639 .await?;
640
641 {
644 let server_response = assign!(http::Response::new("0".to_owned()), {
645 rooms: BTreeMap::from([(
646 room_id.to_owned(),
647 http::response::Room::default(),
648 )]),
649 extensions: assign!(http::response::Extensions::default(), {
650 account_data: assign!(http::response::AccountData::default(), {
651 global: vec![
652 Raw::from_json_string(
653 json!({
654 "type": "m.push_rules",
655 "content": {
656 "global": {
657 "room": [
658 {
659 "actions": ["notify"],
660 "rule_id": room_id,
661 "default": false,
662 "enabled": true,
663 },
664 ],
665 },
666 },
667 })
668 .to_string(),
669 ).unwrap()
670 ]
671 })
672 })
673 });
674
675 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
676 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
677 }
678
679 let room = client.get_room(room_id).unwrap();
681
682 assert_eq!(
684 room.cached_user_defined_notification_mode(),
685 Some(RoomNotificationMode::AllMessages),
686 );
687
688 {
692 let server_response = assign!(http::Response::new("0".to_owned()), {
693 rooms: BTreeMap::from([(
694 room_id.to_owned(),
695 http::response::Room::default(),
696 )]),
697 extensions: assign!(http::response::Extensions::default(), {
698 account_data: assign!(http::response::AccountData::default(), {
699 global: vec![
700 Raw::from_json_string(
701 json!({
702 "type": "m.push_rules",
703 "content": {
704 "global": {
705 "room": [
706 {
707 "actions": [],
708 "rule_id": room_id,
709 "default": false,
710 "enabled": true,
711 },
712 ],
713 },
714 },
715 })
716 .to_string(),
717 ).unwrap()
718 ]
719 })
720 })
721 });
722
723 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
724 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
725 }
726
727 assert_eq!(
729 room.cached_user_defined_notification_mode(),
730 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
731 );
732
733 Ok(())
734 }
735}