1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24mod utils;
25
26use std::{
27 collections::{btree_map::Entry, BTreeMap},
28 fmt::Debug,
29 future::Future,
30 sync::{Arc, RwLock as StdRwLock},
31 time::Duration,
32};
33
34use async_stream::stream;
35pub use client::{Version, VersionBuilder};
36use futures_core::stream::Stream;
37use matrix_sdk_base::RequestedRequiredStates;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
41 assign, OwnedRoomId, RoomId,
42};
43use serde::{Deserialize, Serialize};
44use tokio::{
45 select,
46 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
47};
48use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
49
50#[cfg(feature = "e2e-encryption")]
51use self::utils::JoinHandleExt as _;
52pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
53use self::{
54 cache::restore_sliding_sync_state,
55 client::SlidingSyncResponseProcessor,
56 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
57};
58use crate::{config::RequestConfig, Client, Result};
59
60#[derive(Clone, Debug)]
64pub struct SlidingSync {
65 inner: Arc<SlidingSyncInner>,
67}
68
69#[derive(Debug)]
70pub(super) struct SlidingSyncInner {
71 id: String,
75
76 client: Client,
78
79 poll_timeout: Duration,
81
82 network_timeout: Duration,
85
86 storage_key: String,
88
89 share_pos: bool,
96
97 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
110
111 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
113
114 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
116
117 internal_channel: Sender<SlidingSyncInternalMessage>,
120}
121
122impl SlidingSync {
123 pub(super) fn new(inner: SlidingSyncInner) -> Self {
124 Self { inner: Arc::new(inner) }
125 }
126
127 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
128 cache::store_sliding_sync_state(self, position).await
129 }
130
131 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
133 SlidingSyncBuilder::new(id, client)
134 }
135
136 pub fn subscribe_to_rooms(
143 &self,
144 room_ids: &[&RoomId],
145 settings: Option<http::request::RoomSubscription>,
146 cancel_in_flight_request: bool,
147 ) {
148 let settings = settings.unwrap_or_default();
149 let mut sticky = self.inner.sticky.write().unwrap();
150 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
151
152 let mut skip_over_current_sync_loop_iteration = false;
153
154 for room_id in room_ids {
155 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
162 if let Some(room) = self.inner.client.get_room(room_id) {
163 room.mark_members_missing();
164 }
165
166 entry.insert((RoomSubscriptionState::default(), settings.clone()));
167
168 skip_over_current_sync_loop_iteration = true;
169 }
170 }
171
172 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
173 self.inner.internal_channel_send_if_possible(
174 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
175 );
176 }
177 }
178
179 pub async fn on_list<Function, FunctionOutput, R>(
181 &self,
182 list_name: &str,
183 function: Function,
184 ) -> Option<R>
185 where
186 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
187 FunctionOutput: Future<Output = R>,
188 {
189 let lists = self.inner.lists.read().await;
190
191 match lists.get(list_name) {
192 Some(list) => Some(function(list).await),
193 None => None,
194 }
195 }
196
197 pub async fn add_list(
203 &self,
204 list_builder: SlidingSyncListBuilder,
205 ) -> Result<Option<SlidingSyncList>> {
206 let list = list_builder.build(self.inner.internal_channel.clone());
207
208 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
209
210 self.inner.internal_channel_send_if_possible(
211 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
212 );
213
214 Ok(old_list)
215 }
216
217 pub async fn add_cached_list(
224 &self,
225 mut list_builder: SlidingSyncListBuilder,
226 ) -> Result<Option<SlidingSyncList>> {
227 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
228
229 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
230
231 self.add_list(list_builder).await
232 }
233
234 #[instrument(skip_all)]
236 async fn handle_response(
237 &self,
238 sliding_sync_response: http::Response,
239 position: &mut SlidingSyncPositionMarkers,
240 requested_required_states: RequestedRequiredStates,
241 ) -> Result<UpdateSummary, crate::Error> {
242 let pos = Some(sliding_sync_response.pos.clone());
243
244 let must_process_rooms_response = self.must_process_rooms_response().await;
245
246 trace!(yes = must_process_rooms_response, "Must process rooms response?");
247
248 let sync_response = {
256 let response_processor = {
257 let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
260
261 let mut response_processor =
262 SlidingSyncResponseProcessor::new(self.inner.client.clone());
263
264 #[cfg(feature = "e2e-encryption")]
265 if self.is_e2ee_enabled() {
266 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
267 }
268
269 if must_process_rooms_response {
272 response_processor
273 .handle_room_response(&sliding_sync_response, &requested_required_states)
274 .await?;
275 }
276
277 response_processor
278 };
279
280 response_processor.process_and_take_response().await?
282 };
283
284 debug!(?sync_response, "Sliding Sync response has been handled by the client");
285
286 if let Some(ref txn_id) = sliding_sync_response.txn_id {
288 let txn_id = txn_id.as_str().into();
289 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
290 let mut lists = self.inner.lists.write().await;
291 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
292 }
293
294 let update_summary = {
295 let updated_rooms = {
297 let mut updated_rooms = Vec::with_capacity(
298 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
299 );
300
301 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
302
303 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
311
312 updated_rooms
313 };
314
315 let updated_lists = {
317 debug!(
318 lists = ?sliding_sync_response.lists,
319 "Update lists"
320 );
321
322 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
323 let mut lists = self.inner.lists.write().await;
324
325 for (name, list) in lists.iter_mut() {
328 if let Some(updates) = sliding_sync_response.lists.get(name) {
329 let maximum_number_of_rooms: u32 =
330 updates.count.try_into().expect("failed to convert `count` to `u32`");
331
332 if list.update(Some(maximum_number_of_rooms))? {
333 updated_lists.push(name.clone());
334 }
335 } else if list.update(None)? {
336 updated_lists.push(name.clone());
337 }
338 }
339
340 for name in sliding_sync_response.lists.keys() {
342 if !lists.contains_key(name) {
343 error!("Response for list `{name}` - unknown to us; skipping");
344 }
345 }
346
347 updated_lists
348 };
349
350 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
351 };
352
353 position.pos = pos;
357
358 Ok(update_summary)
359 }
360
361 async fn generate_sync_request(
362 &self,
363 txn_id: &mut LazyTransactionId,
364 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
365 let mut requests_lists = BTreeMap::new();
367
368 let require_timeout = {
369 let lists = self.inner.lists.read().await;
370
371 let mut require_timeout = true;
373
374 for (name, list) in lists.iter() {
375 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
376 require_timeout = require_timeout && list.requires_timeout();
377 }
378
379 require_timeout
380 };
381
382 let mut position_guard = self.inner.position.clone().lock_owned().await;
390
391 let to_device_enabled =
392 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
393
394 let restored_fields = if self.inner.share_pos || to_device_enabled {
395 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
396 } else {
397 None
398 };
399
400 let pos = if self.inner.share_pos {
403 if let Some(fields) = &restored_fields {
404 if fields.pos != position_guard.pos {
406 info!(
407 "Pos from previous request ('{:?}') was different from \
408 pos in database ('{:?}').",
409 position_guard.pos, fields.pos
410 );
411 position_guard.pos = fields.pos.clone();
412 }
413 fields.pos.clone()
414 } else {
415 position_guard.pos.clone()
416 }
417 } else {
418 position_guard.pos.clone()
419 };
420
421 Span::current().record("pos", &pos);
422
423 #[cfg(feature = "e2e-encryption")]
432 if pos.is_none() && self.is_e2ee_enabled() {
433 info!("Marking all tracked users as dirty");
434
435 let olm_machine = self.inner.client.olm_machine().await;
436 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
437 olm_machine.mark_all_tracked_users_as_dirty().await?;
438 }
439
440 let timeout = require_timeout.then(|| self.inner.poll_timeout);
445
446 let mut request = assign!(http::Request::new(), {
447 conn_id: Some(self.inner.id.clone()),
448 pos,
449 timeout,
450 lists: requests_lists,
451 });
452
453 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
455
456 if to_device_enabled {
460 request.extensions.to_device.since =
461 restored_fields.and_then(|fields| fields.to_device_token);
462 }
463
464 if let Some(txn_id) = txn_id.get() {
466 request.txn_id = Some(txn_id.to_string());
467 }
468
469 Ok((
470 request,
472 RequestConfig::default()
475 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
476 .retry_limit(3),
477 position_guard,
478 ))
479 }
480
481 async fn send_sync_request(
485 &self,
486 request: http::Request,
487 request_config: RequestConfig,
488 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
489 ) -> Result<UpdateSummary> {
490 debug!("Sending request");
491
492 let requested_required_states = RequestedRequiredStates::from(&request);
494 let request = self.inner.client.send(request).with_request_config(request_config);
495
496 #[cfg(feature = "e2e-encryption")]
503 let response = {
504 if self.is_e2ee_enabled() {
505 let client = self.inner.client.clone();
522 let e2ee_uploads = spawn(
523 async move {
524 if let Err(error) = client.send_outgoing_requests().await {
525 error!(?error, "Error while sending outgoing E2EE requests");
526 }
527 }
528 .instrument(Span::current()),
529 )
530 .abort_on_drop();
533
534 let response = request.await?;
536
537 e2ee_uploads.await.map_err(|error| Error::JoinError {
542 task_description: "e2ee_uploads".to_owned(),
543 error,
544 })?;
545
546 response
547 } else {
548 request.await?
549 }
550 };
551
552 #[cfg(not(feature = "e2e-encryption"))]
554 let response = request.await?;
555
556 debug!("Received response");
557
558 let this = self.clone();
568
569 let future = async move {
572 debug!("Start handling response");
573
574 let updates = this
580 .handle_response(response, &mut position_guard, requested_required_states)
581 .await?;
582
583 this.cache_to_storage(&position_guard).await?;
584
585 drop(position_guard);
588
589 debug!("Done handling response");
590
591 Ok(updates)
592 };
593
594 spawn(future.instrument(Span::current())).await.unwrap()
595 }
596
597 #[cfg(feature = "e2e-encryption")]
599 fn is_e2ee_enabled(&self) -> bool {
600 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
601 }
602
603 #[cfg(not(feature = "e2e-encryption"))]
604 fn is_e2ee_enabled(&self) -> bool {
605 false
606 }
607
608 async fn must_process_rooms_response(&self) -> bool {
610 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
613 || !self.inner.lists.read().await.is_empty()
614 }
615
616 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
617 async fn sync_once(&self) -> Result<UpdateSummary> {
618 let (request, request_config, position_guard) =
619 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
620
621 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
623
624 self.inner.client.inner.sync_beat.notify(usize::MAX);
626
627 Ok(summaries)
628 }
629
630 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
640 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
641 debug!("Starting sync stream");
642
643 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
644
645 stream! {
646 loop {
647 debug!("Sync stream is running");
648
649 select! {
650 biased;
651
652 internal_message = internal_channel_receiver.recv() => {
653 use SlidingSyncInternalMessage::*;
654
655 debug!(?internal_message, "Sync stream has received an internal message");
656
657 match internal_message {
658 Err(_) | Ok(SyncLoopStop) => {
659 break;
660 }
661
662 Ok(SyncLoopSkipOverCurrentIteration) => {
663 continue;
664 }
665 }
666 }
667
668 update_summary = self.sync_once() => {
669 match update_summary {
670 Ok(updates) => {
671 yield Ok(updates);
672 }
673
674 Err(error) => {
676 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
677 self.expire_session().await;
679 }
680
681 yield Err(error);
682
683 break;
685 }
686 }
687 }
688 }
689 }
690
691 debug!("Sync stream has exited.");
692 }
693 }
694
695 pub fn stop_sync(&self) -> Result<()> {
704 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
705 }
706
707 #[doc(hidden)]
719 pub async fn expire_session(&self) {
720 info!("Session expired; resetting `pos` and sticky parameters");
721
722 {
723 let mut position = self.inner.position.lock().await;
724 position.pos = None;
725
726 if let Err(err) = self.cache_to_storage(&position).await {
727 error!(
728 "couldn't invalidate sliding sync frozen state when expiring session: {err}"
729 );
730 }
731 }
732
733 {
734 let mut sticky = self.inner.sticky.write().unwrap();
735
736 sticky.data_mut().room_subscriptions.clear();
739 }
740
741 self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
742 }
743}
744
745impl SlidingSyncInner {
746 #[instrument]
748 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
749 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
750 }
751
752 #[instrument]
755 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
756 let _ = self.internal_channel.send(message);
758 }
759}
760
761#[derive(Copy, Clone, Debug, PartialEq)]
762enum SlidingSyncInternalMessage {
763 SyncLoopStop,
765
766 SyncLoopSkipOverCurrentIteration,
769}
770
771#[cfg(any(test, feature = "testing"))]
772impl SlidingSync {
773 pub async fn set_pos(&self, new_pos: String) {
775 let mut position_lock = self.inner.position.lock().await;
776 position_lock.pos = Some(new_pos);
777 }
778
779 pub fn extensions_config(&self) -> http::request::Extensions {
785 let sticky = self.inner.sticky.read().unwrap();
786 sticky.data().extensions.clone()
787 }
788}
789
790#[derive(Clone, Debug)]
791pub(super) struct SlidingSyncPositionMarkers {
792 pos: Option<String>,
797}
798
799#[derive(Serialize, Deserialize)]
800struct FrozenSlidingSyncPos {
801 #[serde(skip_serializing_if = "Option::is_none")]
802 pos: Option<String>,
803}
804
805#[derive(Debug, Clone)]
808pub struct UpdateSummary {
809 pub lists: Vec<String>,
811 pub rooms: Vec<OwnedRoomId>,
813}
814
815#[derive(Debug, Default)]
819enum RoomSubscriptionState {
820 #[default]
824 Pending,
825
826 Applied,
829}
830
831#[derive(Debug)]
834pub(super) struct SlidingSyncStickyParameters {
835 room_subscriptions:
838 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
839
840 extensions: http::request::Extensions,
843}
844
845impl SlidingSyncStickyParameters {
846 pub fn new(
848 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
849 extensions: http::request::Extensions,
850 ) -> Self {
851 Self {
852 room_subscriptions: room_subscriptions
853 .into_iter()
854 .map(|(room_id, room_subscription)| {
855 (room_id, (RoomSubscriptionState::Pending, room_subscription))
856 })
857 .collect(),
858 extensions,
859 }
860 }
861}
862
863impl StickyData for SlidingSyncStickyParameters {
864 type Request = http::Request;
865
866 fn apply(&self, request: &mut Self::Request) {
867 request.room_subscriptions = self
868 .room_subscriptions
869 .iter()
870 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
871 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
872 .collect();
873 request.extensions = self.extensions.clone();
874 }
875
876 fn on_commit(&mut self) {
877 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
879 if matches!(state, RoomSubscriptionState::Pending) {
880 *state = RoomSubscriptionState::Applied;
881 }
882 }
883 }
884}
885
886#[cfg(all(test, not(target_family = "wasm")))]
887#[allow(clippy::dbg_macro)]
888mod tests {
889 use std::{
890 collections::BTreeMap,
891 future::ready,
892 ops::Not,
893 sync::{Arc, Mutex},
894 time::Duration,
895 };
896
897 use assert_matches::assert_matches;
898 use event_listener::Listener;
899 use futures_util::{future::join_all, pin_mut, StreamExt};
900 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
901 use matrix_sdk_common::executor::spawn;
902 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
903 use ruma::{
904 api::client::error::ErrorKind,
905 assign,
906 events::{direct::DirectEvent, room::member::MembershipState},
907 owned_room_id, room_id,
908 serde::Raw,
909 uint, OwnedRoomId, TransactionId,
910 };
911 use serde::Deserialize;
912 use serde_json::json;
913 use wiremock::{
914 http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
915 };
916
917 use super::{
918 http,
919 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
920 SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
921 SlidingSyncStickyParameters,
922 };
923 use crate::{
924 sliding_sync::cache::restore_sliding_sync_state,
925 test_utils::{logged_in_client, mocks::MatrixMockServer},
926 Client, Result,
927 };
928
929 #[derive(Copy, Clone)]
930 struct SlidingSyncMatcher;
931
932 impl Match for SlidingSyncMatcher {
933 fn matches(&self, request: &Request) -> bool {
934 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
935 && request.method == Method::POST
936 }
937 }
938
939 async fn new_sliding_sync(
940 lists: Vec<SlidingSyncListBuilder>,
941 ) -> Result<(MockServer, SlidingSync)> {
942 let server = MockServer::start().await;
943 let client = logged_in_client(Some(server.uri())).await;
944
945 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
946
947 for list in lists {
948 sliding_sync_builder = sliding_sync_builder.add_list(list);
949 }
950
951 let sliding_sync = sliding_sync_builder.build().await?;
952
953 Ok((server, sliding_sync))
954 }
955
956 #[async_test]
957 async fn test_subscribe_to_rooms() -> Result<()> {
958 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
959 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
960 .await?;
961
962 let stream = sliding_sync.sync();
963 pin_mut!(stream);
964
965 let room_id_0 = room_id!("!r0:bar.org");
966 let room_id_1 = room_id!("!r1:bar.org");
967 let room_id_2 = room_id!("!r2:bar.org");
968
969 {
970 let _mock_guard = Mock::given(SlidingSyncMatcher)
971 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
972 "pos": "1",
973 "lists": {},
974 "rooms": {
975 room_id_0: {
976 "name": "Room #0",
977 "initial": true,
978 },
979 room_id_1: {
980 "name": "Room #1",
981 "initial": true,
982 },
983 room_id_2: {
984 "name": "Room #2",
985 "initial": true,
986 },
987 }
988 })))
989 .mount_as_scoped(&server)
990 .await;
991
992 let _ = stream.next().await.unwrap()?;
993 }
994
995 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
996
997 assert!(room0.are_members_synced().not());
1001
1002 {
1003 struct MemberMatcher(OwnedRoomId);
1004
1005 impl Match for MemberMatcher {
1006 fn matches(&self, request: &Request) -> bool {
1007 request.url.path()
1008 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1009 && request.method == Method::GET
1010 }
1011 }
1012
1013 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1014 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1015 "chunk": [],
1016 })))
1017 .mount_as_scoped(&server)
1018 .await;
1019
1020 assert_matches!(room0.request_members().await, Ok(()));
1021 }
1022
1023 assert!(room0.are_members_synced());
1025
1026 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1027
1028 assert!(room0.are_members_synced().not());
1031
1032 {
1033 let sticky = sliding_sync.inner.sticky.read().unwrap();
1034 let room_subscriptions = &sticky.data().room_subscriptions;
1035
1036 assert!(room_subscriptions.contains_key(room_id_0));
1037 assert!(room_subscriptions.contains_key(room_id_1));
1038 assert!(!room_subscriptions.contains_key(room_id_2));
1039 }
1040
1041 {
1044 struct MemberMatcher(OwnedRoomId);
1045
1046 impl Match for MemberMatcher {
1047 fn matches(&self, request: &Request) -> bool {
1048 request.url.path()
1049 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1050 && request.method == Method::GET
1051 }
1052 }
1053
1054 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1055 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1056 "chunk": [],
1057 })))
1058 .mount_as_scoped(&server)
1059 .await;
1060
1061 assert_matches!(room0.request_members().await, Ok(()));
1062 }
1063
1064 assert!(room0.are_members_synced());
1066
1067 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1068
1069 assert!(room0.are_members_synced());
1072
1073 Ok(())
1074 }
1075
1076 #[async_test]
1077 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1078 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1079 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1080 .await?;
1081
1082 let room_id_0 = room_id!("!r0:bar.org");
1083 let room_id_1 = room_id!("!r1:bar.org");
1084 let room_id_2 = room_id!("!r2:bar.org");
1085
1086 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1088
1089 {
1090 let sticky = sliding_sync.inner.sticky.read().unwrap();
1091 let room_subscriptions = &sticky.data().room_subscriptions;
1092
1093 assert!(room_subscriptions.contains_key(room_id_0));
1094 assert!(room_subscriptions.contains_key(room_id_1));
1095 assert!(room_subscriptions.contains_key(room_id_2).not());
1096 }
1097
1098 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1100
1101 {
1102 let sticky = sliding_sync.inner.sticky.read().unwrap();
1103 let room_subscriptions = &sticky.data().room_subscriptions;
1104
1105 assert!(room_subscriptions.contains_key(room_id_0));
1106 assert!(room_subscriptions.contains_key(room_id_1));
1107 assert!(room_subscriptions.contains_key(room_id_2));
1108 }
1109
1110 sliding_sync.expire_session().await;
1112
1113 {
1114 let sticky = sliding_sync.inner.sticky.read().unwrap();
1115 let room_subscriptions = &sticky.data().room_subscriptions;
1116
1117 assert!(room_subscriptions.is_empty());
1118 }
1119
1120 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1122
1123 {
1124 let sticky = sliding_sync.inner.sticky.read().unwrap();
1125 let room_subscriptions = &sticky.data().room_subscriptions;
1126
1127 assert!(room_subscriptions.contains_key(room_id_0).not());
1128 assert!(room_subscriptions.contains_key(room_id_1).not());
1129 assert!(room_subscriptions.contains_key(room_id_2));
1130 }
1131
1132 Ok(())
1133 }
1134
1135 #[async_test]
1136 async fn test_add_list() -> Result<()> {
1137 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1138 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1139 .await?;
1140
1141 let _stream = sliding_sync.sync();
1142 pin_mut!(_stream);
1143
1144 sliding_sync
1145 .add_list(
1146 SlidingSyncList::builder("bar")
1147 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1148 )
1149 .await?;
1150
1151 let lists = sliding_sync.inner.lists.read().await;
1152
1153 assert!(lists.contains_key("foo"));
1154 assert!(lists.contains_key("bar"));
1155
1156 Ok(())
1159 }
1160
1161 #[test]
1162 fn test_sticky_parameters_api_invalidated_flow() {
1163 let r0 = room_id!("!r0.matrix.org");
1164 let r1 = room_id!("!r1:matrix.org");
1165
1166 let mut room_subscriptions = BTreeMap::new();
1167 room_subscriptions.insert(r0.to_owned(), Default::default());
1168
1169 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1171 room_subscriptions,
1172 Default::default(),
1173 ));
1174 assert!(sticky.is_invalidated());
1175
1176 let txn_id: &TransactionId = "tid123".into();
1178
1179 let mut request = http::Request::default();
1180 request.txn_id = Some(txn_id.to_string());
1181
1182 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1183
1184 assert!(request.txn_id.is_some());
1185 assert_eq!(request.room_subscriptions.len(), 1);
1186 assert!(request.room_subscriptions.contains_key(r0));
1187
1188 let tid = request.txn_id.unwrap();
1189
1190 sticky.maybe_commit(tid.as_str().into());
1191 assert!(!sticky.is_invalidated());
1192
1193 sticky
1195 .data_mut()
1196 .room_subscriptions
1197 .insert(r1.to_owned(), (Default::default(), Default::default()));
1198 assert!(sticky.is_invalidated());
1199
1200 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1202 assert!(sticky.is_invalidated());
1203
1204 let txn_id1: &TransactionId = "tid456".into();
1206 let mut request1 = http::Request::default();
1207 request1.txn_id = Some(txn_id1.to_string());
1208 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1209
1210 assert!(sticky.is_invalidated());
1211 assert_eq!(request1.room_subscriptions.len(), 1);
1215 assert!(request1.room_subscriptions.contains_key(r1));
1216
1217 let txn_id2: &TransactionId = "tid789".into();
1218 let mut request2 = http::Request::default();
1219 request2.txn_id = Some(txn_id2.to_string());
1220
1221 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1222 assert!(sticky.is_invalidated());
1223 assert_eq!(request2.room_subscriptions.len(), 1);
1226 assert!(request2.room_subscriptions.contains_key(r1));
1227
1228 sticky.maybe_commit(txn_id1);
1231 assert!(sticky.is_invalidated());
1232
1233 sticky.maybe_commit(txn_id2);
1235 assert!(!sticky.is_invalidated());
1236 }
1237
1238 #[test]
1239 fn test_room_subscriptions_are_sticky() {
1240 let r0 = room_id!("!r0.matrix.org");
1241 let r1 = room_id!("!r1:matrix.org");
1242
1243 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1244 BTreeMap::new(),
1245 Default::default(),
1246 ));
1247
1248 {
1250 sticky
1252 .data_mut()
1253 .room_subscriptions
1254 .insert(r0.to_owned(), (Default::default(), Default::default()));
1255
1256 let txn_id: &TransactionId = "tid0".into();
1258 let mut request = http::Request::default();
1259 request.txn_id = Some(txn_id.to_string());
1260
1261 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1262
1263 assert!(request.txn_id.is_some());
1264 assert_eq!(request.room_subscriptions.len(), 1);
1265 assert!(request.room_subscriptions.contains_key(r0));
1266
1267 let tid = request.txn_id.unwrap();
1269
1270 sticky.maybe_commit(tid.as_str().into());
1271 }
1272
1273 {
1275 sticky
1277 .data_mut()
1278 .room_subscriptions
1279 .insert(r1.to_owned(), (Default::default(), Default::default()));
1280
1281 let txn_id: &TransactionId = "tid1".into();
1283 let mut request = http::Request::default();
1284 request.txn_id = Some(txn_id.to_string());
1285
1286 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1287
1288 assert!(request.txn_id.is_some());
1289 assert_eq!(request.room_subscriptions.len(), 1);
1290 assert!(request.room_subscriptions.contains_key(r1));
1292
1293 }
1297
1298 {
1300 let txn_id: &TransactionId = "tid2".into();
1302 let mut request = http::Request::default();
1303 request.txn_id = Some(txn_id.to_string());
1304
1305 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1306
1307 assert!(request.txn_id.is_some());
1308 assert_eq!(request.room_subscriptions.len(), 1);
1309 assert!(request.room_subscriptions.contains_key(r1));
1311
1312 let tid = request.txn_id.unwrap();
1314
1315 sticky.maybe_commit(tid.as_str().into());
1316 }
1317
1318 {
1320 let txn_id: &TransactionId = "tid3".into();
1322 let mut request = http::Request::default();
1323 request.txn_id = Some(txn_id.to_string());
1324
1325 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1326
1327 assert!(request.txn_id.is_some());
1328 assert!(request.room_subscriptions.is_empty());
1330 }
1331 }
1332
1333 #[test]
1334 fn test_extensions_are_sticky() {
1335 let mut extensions = http::request::Extensions::default();
1336 extensions.account_data.enabled = Some(true);
1337
1338 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1340 Default::default(),
1341 extensions,
1342 ));
1343
1344 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1345
1346 let extensions = &sticky.data().extensions;
1349 assert_eq!(extensions.e2ee.enabled, None);
1350 assert_eq!(extensions.to_device.enabled, None);
1351 assert_eq!(extensions.to_device.since, None);
1352
1353 assert_eq!(extensions.account_data.enabled, Some(true));
1355
1356 let txn_id: &TransactionId = "tid123".into();
1357 let mut request = http::Request::default();
1358 request.txn_id = Some(txn_id.to_string());
1359 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1360 assert!(sticky.is_invalidated());
1361 assert_eq!(request.extensions.to_device.enabled, None);
1362 assert_eq!(request.extensions.to_device.since, None);
1363 assert_eq!(request.extensions.e2ee.enabled, None);
1364 assert_eq!(request.extensions.account_data.enabled, Some(true));
1365 }
1366
1367 #[async_test]
1368 async fn test_sticky_extensions_plus_since() -> Result<()> {
1369 let server = MockServer::start().await;
1370 let client = logged_in_client(Some(server.uri())).await;
1371
1372 let sync = client
1373 .sliding_sync("test-slidingsync")?
1374 .add_list(SlidingSyncList::builder("new_list"))
1375 .build()
1376 .await?;
1377
1378 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1380 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1381 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1382
1383 let sync = client
1385 .sliding_sync("test-slidingsync")?
1386 .add_list(SlidingSyncList::builder("new_list"))
1387 .with_to_device_extension(
1388 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1389 )
1390 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1391 .build()
1392 .await?;
1393
1394 let txn_id = TransactionId::new();
1397 let (request, _, _) = sync
1398 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1399 .await?;
1400
1401 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1402 assert_eq!(request.extensions.to_device.enabled, Some(true));
1403 assert!(request.extensions.to_device.since.is_none());
1404
1405 {
1406 let mut sticky = sync.inner.sticky.write().unwrap();
1408 assert!(sticky.is_invalidated());
1409 sticky.maybe_commit(
1410 "hopefully the rng won't generate this very specific transaction id".into(),
1411 );
1412 assert!(sticky.is_invalidated());
1413 }
1414
1415 let txn_id2 = TransactionId::new();
1417 let (request, _, _) = sync
1418 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1419 .await?;
1420
1421 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1422 assert_eq!(request.extensions.to_device.enabled, Some(true));
1423 assert!(request.extensions.to_device.since.is_none());
1424
1425 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1426
1427 {
1428 let mut sticky = sync.inner.sticky.write().unwrap();
1430 assert!(sticky.is_invalidated());
1431 sticky.maybe_commit(txn_id2.as_str().into());
1432 assert!(!sticky.is_invalidated());
1433 }
1434
1435 let txn_id = TransactionId::new();
1437 let (request, _, _) = sync
1438 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1439 .await?;
1440 assert!(request.extensions.e2ee.enabled.is_none());
1441 assert!(request.extensions.to_device.enabled.is_none());
1442 assert!(request.extensions.to_device.since.is_none());
1443
1444 let _since_token = "since";
1448
1449 #[cfg(feature = "e2e-encryption")]
1450 {
1451 use matrix_sdk_base::crypto::store::Changes;
1452 if let Some(olm_machine) = &*client.olm_machine().await {
1453 olm_machine
1454 .store()
1455 .save_changes(Changes {
1456 next_batch_token: Some(_since_token.to_owned()),
1457 ..Default::default()
1458 })
1459 .await?;
1460 }
1461 }
1462
1463 let txn_id = TransactionId::new();
1464 let (request, _, _) = sync
1465 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1466 .await?;
1467
1468 assert!(request.extensions.e2ee.enabled.is_none());
1469 assert!(request.extensions.to_device.enabled.is_none());
1470
1471 #[cfg(feature = "e2e-encryption")]
1472 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1473
1474 Ok(())
1475 }
1476
1477 #[async_test]
1483 #[cfg(feature = "e2e-encryption")]
1484 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1485 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1486 use matrix_sdk_test::ruma_response_from_json;
1487 use ruma::user_id;
1488
1489 let server = MockServer::start().await;
1490 let client = logged_in_client(Some(server.uri())).await;
1491
1492 let alice = user_id!("@alice:localhost");
1493 let bob = user_id!("@bob:localhost");
1494 let me = user_id!("@example:localhost");
1495
1496 {
1499 let olm_machine = client.olm_machine().await;
1500 let olm_machine = olm_machine.as_ref().unwrap();
1501
1502 olm_machine.update_tracked_users([alice, bob]).await?;
1503
1504 let outgoing_requests = olm_machine.outgoing_requests().await?;
1506
1507 assert_eq!(outgoing_requests.len(), 2);
1508 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1509 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1510
1511 olm_machine
1513 .mark_request_as_sent(
1514 outgoing_requests[0].request_id(),
1515 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1516 "one_time_key_counts": {}
1517 }))),
1518 )
1519 .await?;
1520
1521 olm_machine
1522 .mark_request_as_sent(
1523 outgoing_requests[1].request_id(),
1524 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1525 "device_keys": {
1526 alice: {},
1527 bob: {},
1528 }
1529 }))),
1530 )
1531 .await?;
1532
1533 let outgoing_requests = olm_machine.outgoing_requests().await?;
1535
1536 assert_eq!(outgoing_requests.len(), 1);
1537 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1538
1539 olm_machine
1540 .mark_request_as_sent(
1541 outgoing_requests[0].request_id(),
1542 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1543 "device_keys": {
1544 me: {},
1545 }
1546 }))),
1547 )
1548 .await?;
1549
1550 let outgoing_requests = olm_machine.outgoing_requests().await?;
1552
1553 assert!(outgoing_requests.is_empty());
1554 }
1555
1556 let sync = client
1557 .sliding_sync("test-slidingsync")?
1558 .add_list(SlidingSyncList::builder("new_list"))
1559 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1560 .build()
1561 .await?;
1562
1563 let txn_id = TransactionId::new();
1565 let (_request, _, _) = sync
1566 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1567 .await?;
1568
1569 {
1571 let olm_machine = client.olm_machine().await;
1572 let olm_machine = olm_machine.as_ref().unwrap();
1573
1574 let outgoing_requests = olm_machine.outgoing_requests().await?;
1576
1577 assert_eq!(outgoing_requests.len(), 1);
1578 assert_matches!(
1579 outgoing_requests[0].request(),
1580 AnyOutgoingRequest::KeysQuery(request) => {
1581 assert!(request.device_keys.contains_key(alice));
1582 assert!(request.device_keys.contains_key(bob));
1583 assert!(request.device_keys.contains_key(me));
1584 }
1585 );
1586
1587 olm_machine
1589 .mark_request_as_sent(
1590 outgoing_requests[0].request_id(),
1591 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1592 "device_keys": {
1593 alice: {},
1594 bob: {},
1595 me: {},
1596 }
1597 }))),
1598 )
1599 .await?;
1600 }
1601
1602 sync.set_pos("chocolat".to_owned()).await;
1604
1605 let txn_id = TransactionId::new();
1606 let (_request, _, _) = sync
1607 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1608 .await?;
1609
1610 {
1612 let olm_machine = client.olm_machine().await;
1613 let olm_machine = olm_machine.as_ref().unwrap();
1614
1615 let outgoing_requests = olm_machine.outgoing_requests().await?;
1617
1618 assert!(outgoing_requests.is_empty());
1619 }
1620
1621 Ok(())
1622 }
1623
1624 #[async_test]
1625 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1626 let server = MockServer::start().await;
1627 let client = logged_in_client(Some(server.uri())).await;
1628
1629 let sliding_sync = client
1630 .sliding_sync("test-slidingsync")?
1631 .with_to_device_extension(
1632 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1633 )
1634 .build()
1635 .await?;
1636
1637 let (request, _, _) =
1639 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1640 assert!(request.extensions.to_device.enabled.is_some());
1641
1642 let sync = sliding_sync.sync();
1643 pin_mut!(sync);
1644
1645 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1647
1648 #[derive(Deserialize)]
1649 struct PartialRequest {
1650 txn_id: Option<String>,
1651 }
1652
1653 {
1654 let _mock_guard = Mock::given(SlidingSyncMatcher)
1655 .respond_with(|request: &Request| {
1656 let request: PartialRequest = request.body_json().unwrap();
1658
1659 ResponseTemplate::new(200).set_body_json(json!({
1660 "txn_id": request.txn_id,
1661 "pos": "0",
1662 }))
1663 })
1664 .mount_as_scoped(&server)
1665 .await;
1666
1667 let next = sync.next().await;
1668 assert_matches!(next, Some(Ok(_update_summary)));
1669
1670 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1672 }
1673
1674 let (request, _, _) =
1676 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1677 assert!(request.extensions.to_device.enabled.is_none());
1678
1679 {
1681 let _mock_guard = Mock::given(SlidingSyncMatcher)
1682 .respond_with(|request: &Request| {
1683 let request: PartialRequest = request.body_json().unwrap();
1685
1686 ResponseTemplate::new(200).set_body_json(json!({
1687 "txn_id": request.txn_id,
1688 "pos": "1",
1689 }))
1690 })
1691 .mount_as_scoped(&server)
1692 .await;
1693
1694 let next = sync.next().await;
1695 assert_matches!(next, Some(Ok(_update_summary)));
1696
1697 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1699 }
1700
1701 {
1704 let _mock_guard = Mock::given(SlidingSyncMatcher)
1705 .respond_with(|request: &Request| {
1706 let request: PartialRequest = request.body_json().unwrap();
1708
1709 ResponseTemplate::new(200).set_body_json(json!({
1710 "txn_id": request.txn_id,
1711 "pos": "0", }))
1713 })
1714 .up_to_n_times(1) .mount_as_scoped(&server)
1716 .await;
1717
1718 let next = sync.next().await;
1719 assert_matches!(next, Some(Ok(_update_summary)));
1720
1721 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1723 }
1724
1725 {
1730 let _mock_guard = Mock::given(SlidingSyncMatcher)
1731 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1732 "error": "foo",
1733 "errcode": "M_UNKNOWN_POS",
1734 })))
1735 .mount_as_scoped(&server)
1736 .await;
1737
1738 let next = sync.next().await;
1739
1740 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1742
1743 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1745
1746 let (request, _, _) =
1748 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1749
1750 assert!(request.extensions.to_device.enabled.is_some());
1751
1752 assert!(sync.next().await.is_none());
1754 }
1755
1756 Ok(())
1757 }
1758
1759 #[cfg(feature = "e2e-encryption")]
1760 #[async_test]
1761 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1762 let server = MockServer::start().await;
1763
1764 #[derive(Deserialize)]
1765 struct PartialRequest {
1766 txn_id: Option<String>,
1767 }
1768
1769 let server_pos = Arc::new(Mutex::new(0));
1770 let _mock_guard = Mock::given(SlidingSyncMatcher)
1771 .respond_with(move |request: &Request| {
1772 let request: PartialRequest = request.body_json().unwrap();
1774 let pos = {
1775 let mut pos = server_pos.lock().unwrap();
1776 let prev = *pos;
1777 *pos += 1;
1778 prev
1779 };
1780
1781 ResponseTemplate::new(200).set_body_json(json!({
1782 "txn_id": request.txn_id,
1783 "pos": pos.to_string(),
1784 }))
1785 })
1786 .mount_as_scoped(&server)
1787 .await;
1788
1789 let client = logged_in_client(Some(server.uri())).await;
1790
1791 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1792
1793 {
1795 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1796
1797 let (request, _, _) =
1798 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1799 assert!(request.pos.is_none());
1800 }
1801
1802 let sync = sliding_sync.sync();
1803 pin_mut!(sync);
1804
1805 let next = sync.next().await;
1808 assert_matches!(next, Some(Ok(_update_summary)));
1809
1810 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1811
1812 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1813 .await?
1814 .expect("must have restored fields");
1815
1816 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1819
1820 {
1824 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1825
1826 let mut position_guard = other_sync.inner.position.lock().await;
1827 position_guard.pos = Some("yolo".to_owned());
1828
1829 other_sync.cache_to_storage(&position_guard).await?;
1830 }
1831
1832 {
1834 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1835 let (request, _, _) =
1836 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1837 assert_eq!(request.pos.as_deref(), Some("0"));
1838 }
1839
1840 {
1843 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1844 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1845 }
1846
1847 Ok(())
1848 }
1849
1850 #[cfg(feature = "e2e-encryption")]
1851 #[async_test]
1852 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1853 let server = MockServer::start().await;
1854
1855 #[derive(Deserialize)]
1856 struct PartialRequest {
1857 txn_id: Option<String>,
1858 }
1859
1860 let server_pos = Arc::new(Mutex::new(0));
1861 let _mock_guard = Mock::given(SlidingSyncMatcher)
1862 .respond_with(move |request: &Request| {
1863 let request: PartialRequest = request.body_json().unwrap();
1865 let pos = {
1866 let mut pos = server_pos.lock().unwrap();
1867 let prev = *pos;
1868 *pos += 1;
1869 prev
1870 };
1871
1872 ResponseTemplate::new(200).set_body_json(json!({
1873 "txn_id": request.txn_id,
1874 "pos": pos.to_string(),
1875 }))
1876 })
1877 .mount_as_scoped(&server)
1878 .await;
1879
1880 let client = logged_in_client(Some(server.uri())).await;
1881
1882 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1883
1884 {
1886 let (request, _, _) =
1887 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1888
1889 assert!(request.pos.is_none());
1890 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1891 }
1892
1893 let sync = sliding_sync.sync();
1894 pin_mut!(sync);
1895
1896 let next = sync.next().await;
1899 assert_matches!(next, Some(Ok(_update_summary)));
1900
1901 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1902
1903 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1904 .await?
1905 .expect("must have restored fields");
1906
1907 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1910
1911 {
1913 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1914
1915 let mut position_guard = other_sync.inner.position.lock().await;
1916 position_guard.pos = Some("42".to_owned());
1917
1918 other_sync.cache_to_storage(&position_guard).await?;
1919 }
1920
1921 {
1923 let (request, _, _) =
1924 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1925 assert_eq!(request.pos.as_deref(), Some("42"));
1926 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1927 }
1928
1929 {
1931 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1932 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1933
1934 let (request, _, _) =
1935 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1936 assert_eq!(request.pos.as_deref(), Some("42"));
1937 }
1938
1939 sliding_sync.expire_session().await;
1942
1943 {
1944 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1945
1946 let (request, _, _) =
1947 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1948 assert!(request.pos.is_none());
1949 }
1950
1951 {
1953 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1954 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1955
1956 let (request, _, _) =
1957 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1958 assert!(request.pos.is_none());
1959 }
1960
1961 Ok(())
1962 }
1963
1964 #[async_test]
1965 async fn test_stop_sync_loop() -> Result<()> {
1966 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1967 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1968 .await?;
1969
1970 let stream = sliding_sync.sync();
1972 pin_mut!(stream);
1973
1974 assert!(stream.next().await.is_some());
1976
1977 sliding_sync.stop_sync()?;
1979
1980 assert!(stream.next().await.is_none());
1982
1983 let stream = sliding_sync.sync();
1985 pin_mut!(stream);
1986
1987 assert!(stream.next().await.is_some());
1989
1990 Ok(())
1991 }
1992
1993 #[async_test]
1994 async fn test_process_read_receipts() -> Result<()> {
1995 let room = owned_room_id!("!pony:example.org");
1996
1997 let server = MockServer::start().await;
1998 let client = logged_in_client(Some(server.uri())).await;
1999 client.event_cache().subscribe().unwrap();
2000
2001 let sliding_sync = client
2002 .sliding_sync("test")?
2003 .with_receipt_extension(
2004 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2005 )
2006 .add_list(
2007 SlidingSyncList::builder("all")
2008 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2009 )
2010 .build()
2011 .await?;
2012
2013 {
2015 let server_response = assign!(http::Response::new("0".to_owned()), {
2016 rooms: BTreeMap::from([(
2017 room.clone(),
2018 http::response::Room::default(),
2019 )])
2020 });
2021
2022 let _summary = {
2023 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2024 sliding_sync
2025 .handle_response(
2026 server_response.clone(),
2027 &mut pos_guard,
2028 RequestedRequiredStates::default(),
2029 )
2030 .await?
2031 };
2032 }
2033
2034 let server_response = assign!(http::Response::new("1".to_owned()), {
2035 extensions: assign!(http::response::Extensions::default(), {
2036 receipts: assign!(http::response::Receipts::default(), {
2037 rooms: BTreeMap::from([
2038 (
2039 room.clone(),
2040 Raw::from_json_string(
2041 json!({
2042 "room_id": room,
2043 "type": "m.receipt",
2044 "content": {
2045 "$event:bar.org": {
2046 "m.read": {
2047 client.user_id().unwrap(): {
2048 "ts": 1436451550,
2049 }
2050 }
2051 }
2052 }
2053 })
2054 .to_string(),
2055 ).unwrap()
2056 )
2057 ])
2058 })
2059 })
2060 });
2061
2062 let summary = {
2063 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2064 sliding_sync
2065 .handle_response(
2066 server_response.clone(),
2067 &mut pos_guard,
2068 RequestedRequiredStates::default(),
2069 )
2070 .await?
2071 };
2072
2073 assert!(summary.rooms.contains(&room));
2074
2075 Ok(())
2076 }
2077
2078 #[async_test]
2079 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2080 let room_id = owned_room_id!("!unicorn:example.org");
2081
2082 let server = MockServer::start().await;
2083 let client = logged_in_client(Some(server.uri())).await;
2084
2085 let sliding_sync = client
2088 .sliding_sync("test")?
2089 .with_account_data_extension(
2090 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2091 )
2092 .add_list(
2093 SlidingSyncList::builder("all")
2094 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2095 )
2096 .build()
2097 .await?;
2098
2099 {
2101 let server_response = assign!(http::Response::new("0".to_owned()), {
2102 rooms: BTreeMap::from([(
2103 room_id.clone(),
2104 http::response::Room::default(),
2105 )])
2106 });
2107
2108 let _summary = {
2109 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2110 sliding_sync
2111 .handle_response(
2112 server_response.clone(),
2113 &mut pos_guard,
2114 RequestedRequiredStates::default(),
2115 )
2116 .await?
2117 };
2118 }
2119
2120 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2124
2125 let update_summary = {
2126 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2127 sliding_sync
2128 .handle_response(
2129 server_response.clone(),
2130 &mut pos_guard,
2131 RequestedRequiredStates::default(),
2132 )
2133 .await?
2134 };
2135
2136 assert!(update_summary.rooms.contains(&room_id));
2139
2140 let room = client.get_room(&room_id).unwrap();
2141
2142 assert!(room.is_marked_unread());
2145
2146 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2149
2150 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2151 sliding_sync
2152 .handle_response(
2153 server_response.clone(),
2154 &mut pos_guard,
2155 RequestedRequiredStates::default(),
2156 )
2157 .await?;
2158
2159 let room = client.get_room(&room_id).unwrap();
2160
2161 assert!(!room.is_marked_unread());
2162
2163 Ok(())
2164 }
2165
2166 fn make_mark_unread_response(
2167 response_number: &str,
2168 room_id: OwnedRoomId,
2169 unread: bool,
2170 add_rooms_section: bool,
2171 ) -> http::Response {
2172 let rooms = if add_rooms_section {
2173 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2174 } else {
2175 BTreeMap::new()
2176 };
2177
2178 let extensions = assign!(http::response::Extensions::default(), {
2179 account_data: assign!(http::response::AccountData::default(), {
2180 rooms: BTreeMap::from([
2181 (
2182 room_id,
2183 vec![
2184 Raw::from_json_string(
2185 json!({
2186 "content": {
2187 "unread": unread
2188 },
2189 "type": "m.marked_unread"
2190 })
2191 .to_string(),
2192 ).unwrap()
2193 ]
2194 )
2195 ])
2196 })
2197 });
2198
2199 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2200 }
2201
2202 #[async_test]
2203 async fn test_process_rooms_account_data() -> Result<()> {
2204 let room = owned_room_id!("!pony:example.org");
2205
2206 let server = MockServer::start().await;
2207 let client = logged_in_client(Some(server.uri())).await;
2208
2209 let sliding_sync = client
2210 .sliding_sync("test")?
2211 .with_account_data_extension(
2212 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2213 )
2214 .add_list(
2215 SlidingSyncList::builder("all")
2216 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2217 )
2218 .build()
2219 .await?;
2220
2221 {
2223 let server_response = assign!(http::Response::new("0".to_owned()), {
2224 rooms: BTreeMap::from([(
2225 room.clone(),
2226 http::response::Room::default(),
2227 )])
2228 });
2229
2230 let _summary = {
2231 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2232 sliding_sync
2233 .handle_response(
2234 server_response.clone(),
2235 &mut pos_guard,
2236 RequestedRequiredStates::default(),
2237 )
2238 .await?
2239 };
2240 }
2241
2242 let server_response = assign!(http::Response::new("1".to_owned()), {
2243 extensions: assign!(http::response::Extensions::default(), {
2244 account_data: assign!(http::response::AccountData::default(), {
2245 rooms: BTreeMap::from([
2246 (
2247 room.clone(),
2248 vec![
2249 Raw::from_json_string(
2250 json!({
2251 "content": {
2252 "tags": {
2253 "u.work": {
2254 "order": 0.9
2255 }
2256 }
2257 },
2258 "type": "m.tag"
2259 })
2260 .to_string(),
2261 ).unwrap()
2262 ]
2263 )
2264 ])
2265 })
2266 })
2267 });
2268 let summary = {
2269 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2270 sliding_sync
2271 .handle_response(
2272 server_response.clone(),
2273 &mut pos_guard,
2274 RequestedRequiredStates::default(),
2275 )
2276 .await?
2277 };
2278
2279 assert!(summary.rooms.contains(&room));
2280
2281 Ok(())
2282 }
2283
2284 #[async_test]
2285 #[cfg(feature = "e2e-encryption")]
2286 async fn test_process_only_encryption_events() -> Result<()> {
2287 use ruma::OneTimeKeyAlgorithm;
2288
2289 let room = owned_room_id!("!croissant:example.org");
2290
2291 let server = MockServer::start().await;
2292 let client = logged_in_client(Some(server.uri())).await;
2293
2294 let server_response = assign!(http::Response::new("0".to_owned()), {
2295 rooms: BTreeMap::from([(
2296 room.clone(),
2297 assign!(http::response::Room::default(), {
2298 name: Some("Croissants lovers".to_owned()),
2299 timeline: Vec::new(),
2300 }),
2301 )]),
2302
2303 extensions: assign!(http::response::Extensions::default(), {
2304 e2ee: assign!(http::response::E2EE::default(), {
2305 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2306 }),
2307 to_device: Some(assign!(http::response::ToDevice::default(), {
2308 next_batch: "to-device-token".to_owned(),
2309 })),
2310 })
2311 });
2312
2313 let sliding_sync = client
2317 .sliding_sync("test")?
2318 .with_to_device_extension(
2319 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2320 )
2321 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2322 .build()
2323 .await?;
2324
2325 {
2326 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2327
2328 sliding_sync
2329 .handle_response(
2330 server_response.clone(),
2331 &mut position_guard,
2332 RequestedRequiredStates::default(),
2333 )
2334 .await?;
2335 }
2336
2337 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2339 assert_eq!(uploaded_key_count, 42);
2340
2341 {
2342 let olm_machine = &*client.olm_machine_for_testing().await;
2343 assert_eq!(
2344 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2345 Some("to-device-token")
2346 );
2347 }
2348
2349 assert!(client.get_room(&room).is_none());
2351
2352 let client = logged_in_client(Some(server.uri())).await;
2355
2356 let sliding_sync = client
2357 .sliding_sync("test")?
2358 .add_list(SlidingSyncList::builder("thelist"))
2359 .build()
2360 .await?;
2361
2362 {
2363 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2364
2365 sliding_sync
2366 .handle_response(
2367 server_response.clone(),
2368 &mut position_guard,
2369 RequestedRequiredStates::default(),
2370 )
2371 .await?;
2372 }
2373
2374 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2376 assert_eq!(uploaded_key_count, 0);
2377
2378 {
2379 let olm_machine = &*client.olm_machine_for_testing().await;
2380 assert_eq!(
2381 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2382 None
2383 );
2384 }
2385
2386 assert!(client.get_room(&room).is_some());
2388
2389 let client = logged_in_client(Some(server.uri())).await;
2391
2392 let sliding_sync = client
2393 .sliding_sync("test")?
2394 .add_list(SlidingSyncList::builder("thelist"))
2395 .with_to_device_extension(
2396 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2397 )
2398 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2399 .build()
2400 .await?;
2401
2402 {
2403 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2404
2405 sliding_sync
2406 .handle_response(
2407 server_response.clone(),
2408 &mut position_guard,
2409 RequestedRequiredStates::default(),
2410 )
2411 .await?;
2412 }
2413
2414 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2416 assert_eq!(uploaded_key_count, 42);
2417
2418 {
2419 let olm_machine = &*client.olm_machine_for_testing().await;
2420 assert_eq!(
2421 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2422 Some("to-device-token")
2423 );
2424 }
2425
2426 assert!(client.get_room(&room).is_some());
2428
2429 Ok(())
2430 }
2431
2432 #[async_test]
2433 async fn test_lock_multiple_requests() -> Result<()> {
2434 let server = MockServer::start().await;
2435 let client = logged_in_client(Some(server.uri())).await;
2436
2437 let pos = Arc::new(Mutex::new(0));
2438 let _mock_guard = Mock::given(SlidingSyncMatcher)
2439 .respond_with(move |_: &Request| {
2440 let mut pos = pos.lock().unwrap();
2441 *pos += 1;
2442 ResponseTemplate::new(200).set_body_json(json!({
2443 "pos": pos.to_string(),
2444 "lists": {},
2445 "rooms": {}
2446 }))
2447 })
2448 .mount_as_scoped(&server)
2449 .await;
2450
2451 let sliding_sync = client
2452 .sliding_sync("test")?
2453 .with_to_device_extension(
2454 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2455 )
2456 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2457 .build()
2458 .await?;
2459
2460 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2463
2464 for result in requests.await {
2465 result?;
2466 }
2467
2468 Ok(())
2469 }
2470
2471 #[async_test]
2472 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2473 let server = MockServer::start().await;
2474 let client = logged_in_client(Some(server.uri())).await;
2475
2476 let pos = Arc::new(Mutex::new(0));
2477 let _mock_guard = Mock::given(SlidingSyncMatcher)
2478 .respond_with(move |_: &Request| {
2479 let mut pos = pos.lock().unwrap();
2480 *pos += 1;
2481 ResponseTemplate::new(200)
2483 .set_body_json(json!({
2484 "pos": pos.to_string(),
2485 "lists": {},
2486 "rooms": {}
2487 }))
2488 .set_delay(Duration::from_secs(2))
2489 })
2490 .mount_as_scoped(&server)
2491 .await;
2492
2493 let sliding_sync =
2494 client
2495 .sliding_sync("test")?
2496 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2497 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2498 ))
2499 .add_list(
2500 SlidingSyncList::builder("another-list")
2501 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2502 )
2503 .build()
2504 .await?;
2505
2506 let stream = sliding_sync.sync();
2507 pin_mut!(stream);
2508
2509 let cloned_sync = sliding_sync.clone();
2510 spawn(async move {
2511 tokio::time::sleep(Duration::from_millis(100)).await;
2512
2513 cloned_sync
2514 .on_list("another-list", |list| {
2515 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2516 ready(())
2517 })
2518 .await;
2519 });
2520
2521 assert_matches!(stream.next().await, Some(Ok(_)));
2522
2523 sliding_sync.stop_sync().unwrap();
2524
2525 assert_matches!(stream.next().await, None);
2526
2527 let mut num_requests = 0;
2528
2529 for request in server.received_requests().await.unwrap() {
2530 if !SlidingSyncMatcher.matches(&request) {
2531 continue;
2532 }
2533
2534 let another_list_ranges = if num_requests == 0 {
2535 json!([[0, 10]])
2537 } else {
2538 json!([[10, 20]])
2540 };
2541
2542 num_requests += 1;
2543 assert!(num_requests <= 2, "more than one request hit the server");
2544
2545 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2546
2547 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2548 &json_value,
2549 &json!({
2550 "conn_id": "test",
2551 "lists": {
2552 "room-list": {
2553 "ranges": [[0, 9]],
2554 "required_state": [
2555 ["m.room.encryption", ""],
2556 ["m.room.tombstone", ""]
2557 ],
2558 },
2559 "another-list": {
2560 "ranges": another_list_ranges,
2561 "required_state": [
2562 ["m.room.encryption", ""],
2563 ["m.room.tombstone", ""]
2564 ],
2565 },
2566 }
2567 }),
2568 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2569 ) {
2570 dbg!(json_value);
2571 panic!("json differ: {err}");
2572 }
2573 }
2574
2575 assert_eq!(num_requests, 2);
2576
2577 Ok(())
2578 }
2579
2580 #[async_test]
2581 async fn test_timeout_zero_list() -> Result<()> {
2582 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2583
2584 let (request, _, _) =
2585 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2586
2587 assert!(request.timeout.is_some());
2590
2591 Ok(())
2592 }
2593
2594 #[async_test]
2595 async fn test_timeout_one_list() -> Result<()> {
2596 let (_server, sliding_sync) = new_sliding_sync(vec![
2597 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2598 ])
2599 .await?;
2600
2601 let (request, _, _) =
2602 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2603
2604 assert!(request.timeout.is_none());
2606
2607 {
2609 let server_response = assign!(http::Response::new("0".to_owned()), {
2610 lists: BTreeMap::from([(
2611 "foo".to_owned(),
2612 assign!(http::response::List::default(), {
2613 count: uint!(7),
2614 })
2615 )])
2616 });
2617
2618 let _summary = {
2619 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2620 sliding_sync
2621 .handle_response(
2622 server_response.clone(),
2623 &mut pos_guard,
2624 RequestedRequiredStates::default(),
2625 )
2626 .await?
2627 };
2628 }
2629
2630 let (request, _, _) =
2631 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2632
2633 assert!(request.timeout.is_some());
2635
2636 Ok(())
2637 }
2638
2639 #[async_test]
2640 async fn test_timeout_three_lists() -> Result<()> {
2641 let (_server, sliding_sync) = new_sliding_sync(vec![
2642 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2643 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2644 SlidingSyncList::builder("baz")
2645 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2646 ])
2647 .await?;
2648
2649 let (request, _, _) =
2650 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2651
2652 assert!(request.timeout.is_none());
2654
2655 {
2657 let server_response = assign!(http::Response::new("0".to_owned()), {
2658 lists: BTreeMap::from([(
2659 "foo".to_owned(),
2660 assign!(http::response::List::default(), {
2661 count: uint!(7),
2662 })
2663 )])
2664 });
2665
2666 let _summary = {
2667 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2668 sliding_sync
2669 .handle_response(
2670 server_response.clone(),
2671 &mut pos_guard,
2672 RequestedRequiredStates::default(),
2673 )
2674 .await?
2675 };
2676 }
2677
2678 let (request, _, _) =
2679 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2680
2681 assert!(request.timeout.is_none());
2683
2684 {
2686 let server_response = assign!(http::Response::new("1".to_owned()), {
2687 lists: BTreeMap::from([(
2688 "bar".to_owned(),
2689 assign!(http::response::List::default(), {
2690 count: uint!(7),
2691 })
2692 )])
2693 });
2694
2695 let _summary = {
2696 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2697 sliding_sync
2698 .handle_response(
2699 server_response.clone(),
2700 &mut pos_guard,
2701 RequestedRequiredStates::default(),
2702 )
2703 .await?
2704 };
2705 }
2706
2707 let (request, _, _) =
2708 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2709
2710 assert!(request.timeout.is_some());
2712
2713 Ok(())
2714 }
2715
2716 #[async_test]
2717 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2718 let server = MockServer::start().await;
2719 let client = logged_in_client(Some(server.uri())).await;
2720
2721 let _mock_guard = Mock::given(SlidingSyncMatcher)
2722 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2723 "pos": "0",
2724 "lists": {},
2725 "rooms": {}
2726 })))
2727 .mount_as_scoped(&server)
2728 .await;
2729
2730 let sliding_sync = client
2731 .sliding_sync("test")?
2732 .with_to_device_extension(
2733 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2734 )
2735 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2736 .build()
2737 .await?;
2738
2739 let sliding_sync = Arc::new(sliding_sync);
2740
2741 let sync_beat_listener = client.inner.sync_beat.listen();
2743 sliding_sync.sync_once().await?;
2744
2745 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2747 Ok(())
2748 }
2749
2750 #[async_test]
2751 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2752 let server = MockServer::start().await;
2753 let client = logged_in_client(Some(server.uri())).await;
2754
2755 let _mock_guard = Mock::given(SlidingSyncMatcher)
2756 .respond_with(ResponseTemplate::new(404))
2757 .mount_as_scoped(&server)
2758 .await;
2759
2760 let sliding_sync = client
2761 .sliding_sync("test")?
2762 .with_to_device_extension(
2763 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2764 )
2765 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2766 .build()
2767 .await?;
2768
2769 let sliding_sync = Arc::new(sliding_sync);
2770
2771 let sync_beat_listener = client.inner.sync_beat.listen();
2773 let sync_result = sliding_sync.sync_once().await;
2774 assert!(sync_result.is_err());
2775
2776 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2778
2779 Ok(())
2780 }
2781
2782 #[async_test]
2783 async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2784 let server = MatrixMockServer::new().await;
2785 let client = server.client_builder().build().await;
2786 let room_id = room_id!("!mu5hr00m:example.org");
2787
2788 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2789 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2790 "pos": "0",
2791 "lists": {},
2792 "extensions": {
2793 "account_data": {
2794 "global": [
2795 {
2796 "type": "m.direct",
2797 "content": {
2798 "@de4dlockh0lmes:example.org": [
2799 "!mu5hr00m:example.org"
2800 ]
2801 }
2802 }
2803 ]
2804 }
2805 },
2806 "rooms": {
2807 room_id: {
2808 "name": "Mario Bros Fanbase Room",
2809 "initial": true,
2810 },
2811 }
2812 })))
2813 .mount_as_scoped(server.server())
2814 .await;
2815
2816 let f = EventFactory::new().room(room_id);
2817
2818 Mock::given(method("GET"))
2819 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2820 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2821 "chunk": [
2822 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2823 ]
2824 })))
2825 .mount(server.server())
2826 .await;
2827
2828 let (tx, rx) = tokio::sync::oneshot::channel();
2829
2830 let tx = Arc::new(Mutex::new(Some(tx)));
2831 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2832 let members =
2834 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2835 assert_eq!(members.len(), 1);
2836 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2837 });
2838
2839 let sliding_sync = client
2840 .sliding_sync("test")?
2841 .add_list(SlidingSyncList::builder("thelist"))
2842 .with_account_data_extension(
2843 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2844 )
2845 .build()
2846 .await?;
2847
2848 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2849 .await
2850 .expect("Sync did not complete in time")
2851 .expect("Sync failed");
2852
2853 tokio::time::timeout(Duration::from_secs(5), rx)
2855 .await
2856 .expect("Event handler did not complete in time")
2857 .expect("Event handler failed");
2858
2859 Ok(())
2860 }
2861}