1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28 collections::{btree_map::Entry, BTreeMap},
29 fmt::Debug,
30 future::Future,
31 sync::{Arc, RwLock as StdRwLock},
32 time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_base::RequestedRequiredStates;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46 select,
47 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55 cache::restore_sliding_sync_state,
56 client::SlidingSyncResponseProcessor,
57 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, Result};
60
61#[derive(Clone, Debug)]
65pub struct SlidingSync {
66 inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72 id: String,
76
77 client: Client,
79
80 poll_timeout: Duration,
82
83 network_timeout: Duration,
86
87 storage_key: String,
89
90 share_pos: bool,
97
98 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
111
112 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
114
115 rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
117
118 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
120
121 internal_channel: Sender<SlidingSyncInternalMessage>,
124}
125
126impl SlidingSync {
127 pub(super) fn new(inner: SlidingSyncInner) -> Self {
128 Self { inner: Arc::new(inner) }
129 }
130
131 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
132 cache::store_sliding_sync_state(self, position).await
133 }
134
135 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
137 SlidingSyncBuilder::new(id, client)
138 }
139
140 pub fn subscribe_to_rooms(
147 &self,
148 room_ids: &[&RoomId],
149 settings: Option<http::request::RoomSubscription>,
150 cancel_in_flight_request: bool,
151 ) {
152 let settings = settings.unwrap_or_default();
153 let mut sticky = self.inner.sticky.write().unwrap();
154 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
155
156 let mut skip_over_current_sync_loop_iteration = false;
157
158 for room_id in room_ids {
159 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
166 if let Some(room) = self.inner.client.get_room(room_id) {
167 room.mark_members_missing();
168 }
169
170 entry.insert((RoomSubscriptionState::default(), settings.clone()));
171
172 skip_over_current_sync_loop_iteration = true;
173 }
174 }
175
176 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
177 self.inner.internal_channel_send_if_possible(
178 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
179 );
180 }
181 }
182
183 pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
185 self.inner.rooms.read().await.get(room_id).cloned()
186 }
187
188 pub fn get_number_of_rooms(&self) -> usize {
190 self.inner.rooms.blocking_read().len()
191 }
192
193 pub async fn on_list<Function, FunctionOutput, R>(
195 &self,
196 list_name: &str,
197 function: Function,
198 ) -> Option<R>
199 where
200 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
201 FunctionOutput: Future<Output = R>,
202 {
203 let lists = self.inner.lists.read().await;
204
205 match lists.get(list_name) {
206 Some(list) => Some(function(list).await),
207 None => None,
208 }
209 }
210
211 pub async fn add_list(
217 &self,
218 list_builder: SlidingSyncListBuilder,
219 ) -> Result<Option<SlidingSyncList>> {
220 let list = list_builder.build(self.inner.internal_channel.clone());
221
222 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
223
224 self.inner.internal_channel_send_if_possible(
225 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
226 );
227
228 Ok(old_list)
229 }
230
231 pub async fn add_cached_list(
238 &self,
239 mut list_builder: SlidingSyncListBuilder,
240 ) -> Result<Option<SlidingSyncList>> {
241 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
242
243 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
244
245 self.add_list(list_builder).await
246 }
247
248 pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
250 &self,
251 room_ids: I,
252 ) -> Vec<Option<SlidingSyncRoom>> {
253 let rooms = self.inner.rooms.read().await;
254
255 room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
256 }
257
258 pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
260 self.inner.rooms.read().await.values().cloned().collect()
261 }
262
263 #[instrument(skip_all)]
265 async fn handle_response(
266 &self,
267 sliding_sync_response: http::Response,
268 position: &mut SlidingSyncPositionMarkers,
269 requested_required_states: RequestedRequiredStates,
270 ) -> Result<UpdateSummary, crate::Error> {
271 let pos = Some(sliding_sync_response.pos.clone());
272
273 let must_process_rooms_response = self.must_process_rooms_response().await;
274
275 trace!(yes = must_process_rooms_response, "Must process rooms response?");
276
277 let mut sync_response = {
285 let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
288
289 let rooms = &*self.inner.rooms.read().await;
290 let mut response_processor =
291 SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
292
293 #[cfg(feature = "e2e-encryption")]
294 if self.is_e2ee_enabled() {
295 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
296 }
297
298 if must_process_rooms_response {
301 response_processor
302 .handle_room_response(&sliding_sync_response, &requested_required_states)
303 .await?;
304 }
305
306 response_processor.process_and_take_response().await?
307 };
308
309 debug!(?sync_response, "Sliding Sync response has been handled by the client");
310
311 if let Some(ref txn_id) = sliding_sync_response.txn_id {
313 let txn_id = txn_id.as_str().into();
314 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
315 let mut lists = self.inner.lists.write().await;
316 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
317 }
318
319 let update_summary = {
320 let updated_rooms = {
322 let mut rooms_map = self.inner.rooms.write().await;
323
324 let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
325
326 for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
327 let timeline =
331 if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
332 joined_room.timeline.events
333 } else {
334 room_data.timeline.drain(..).map(TimelineEvent::new).collect()
335 };
336
337 match rooms_map.get_mut(&room_id) {
338 Some(room) => {
340 room.update(room_data, timeline);
341 }
342
343 None => {
345 rooms_map.insert(
346 room_id.clone(),
347 SlidingSyncRoom::new(
348 room_id.clone(),
349 room_data.prev_batch,
350 timeline,
351 ),
352 );
353 }
354 }
355
356 updated_rooms.push(room_id);
357 }
358
359 updated_rooms.extend(sync_response.rooms.join.keys().cloned());
367
368 updated_rooms
369 };
370
371 let updated_lists = {
373 debug!(
374 lists = ?sliding_sync_response.lists,
375 "Update lists"
376 );
377
378 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
379 let mut lists = self.inner.lists.write().await;
380
381 for (name, list) in lists.iter_mut() {
384 if let Some(updates) = sliding_sync_response.lists.get(name) {
385 let maximum_number_of_rooms: u32 =
386 updates.count.try_into().expect("failed to convert `count` to `u32`");
387
388 if list.update(Some(maximum_number_of_rooms))? {
389 updated_lists.push(name.clone());
390 }
391 } else if list.update(None)? {
392 updated_lists.push(name.clone());
393 }
394 }
395
396 for name in sliding_sync_response.lists.keys() {
398 if !lists.contains_key(name) {
399 error!("Response for list `{name}` - unknown to us; skipping");
400 }
401 }
402
403 updated_lists
404 };
405
406 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
407 };
408
409 position.pos = pos;
413
414 Ok(update_summary)
415 }
416
417 async fn generate_sync_request(
418 &self,
419 txn_id: &mut LazyTransactionId,
420 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
421 let mut requests_lists = BTreeMap::new();
423
424 let require_timeout = {
425 let lists = self.inner.lists.read().await;
426
427 let mut require_timeout = true;
429
430 for (name, list) in lists.iter() {
431 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
432 require_timeout = require_timeout && list.requires_timeout();
433 }
434
435 require_timeout
436 };
437
438 let mut position_guard = self.inner.position.clone().lock_owned().await;
446
447 let to_device_enabled =
448 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
449
450 let restored_fields = if self.inner.share_pos || to_device_enabled {
451 let lists = self.inner.lists.read().await;
452 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
453 } else {
454 None
455 };
456
457 let pos = if self.inner.share_pos {
460 if let Some(fields) = &restored_fields {
461 if fields.pos != position_guard.pos {
463 info!(
464 "Pos from previous request ('{:?}') was different from \
465 pos in database ('{:?}').",
466 position_guard.pos, fields.pos
467 );
468 position_guard.pos = fields.pos.clone();
469 }
470 fields.pos.clone()
471 } else {
472 position_guard.pos.clone()
473 }
474 } else {
475 position_guard.pos.clone()
476 };
477
478 Span::current().record("pos", &pos);
479
480 #[cfg(feature = "e2e-encryption")]
489 if pos.is_none() && self.is_e2ee_enabled() {
490 info!("Marking all tracked users as dirty");
491
492 let olm_machine = self.inner.client.olm_machine().await;
493 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
494 olm_machine.mark_all_tracked_users_as_dirty().await?;
495 }
496
497 let timeout = require_timeout.then(|| self.inner.poll_timeout);
502
503 let mut request = assign!(http::Request::new(), {
504 conn_id: Some(self.inner.id.clone()),
505 pos,
506 timeout,
507 lists: requests_lists,
508 });
509
510 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
512
513 if to_device_enabled {
517 request.extensions.to_device.since =
518 restored_fields.and_then(|fields| fields.to_device_token);
519 }
520
521 if let Some(txn_id) = txn_id.get() {
523 request.txn_id = Some(txn_id.to_string());
524 }
525
526 Ok((
527 request,
529 RequestConfig::default()
532 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
533 .retry_limit(3),
534 position_guard,
535 ))
536 }
537
538 async fn send_sync_request(
542 &self,
543 request: http::Request,
544 request_config: RequestConfig,
545 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
546 ) -> Result<UpdateSummary> {
547 debug!("Sending request");
548
549 let requested_required_states = RequestedRequiredStates::from(&request);
551 let request = self.inner.client.send(request).with_request_config(request_config);
552
553 #[cfg(feature = "e2e-encryption")]
560 let response = {
561 if self.is_e2ee_enabled() {
562 let client = self.inner.client.clone();
579 let e2ee_uploads = spawn(
580 async move {
581 if let Err(error) = client.send_outgoing_requests().await {
582 error!(?error, "Error while sending outgoing E2EE requests");
583 }
584 }
585 .instrument(Span::current()),
586 )
587 .abort_on_drop();
590
591 let response = request.await?;
593
594 e2ee_uploads.await.map_err(|error| Error::JoinError {
599 task_description: "e2ee_uploads".to_owned(),
600 error,
601 })?;
602
603 response
604 } else {
605 request.await?
606 }
607 };
608
609 #[cfg(not(feature = "e2e-encryption"))]
611 let response = request.await?;
612
613 debug!("Received response");
614
615 let this = self.clone();
625
626 let future = async move {
629 debug!("Start handling response");
630
631 let updates = this
637 .handle_response(response, &mut position_guard, requested_required_states)
638 .await?;
639
640 this.cache_to_storage(&position_guard).await?;
641
642 drop(position_guard);
645
646 debug!("Done handling response");
647
648 Ok(updates)
649 };
650
651 spawn(future.instrument(Span::current())).await.unwrap()
652 }
653
654 #[cfg(feature = "e2e-encryption")]
656 fn is_e2ee_enabled(&self) -> bool {
657 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
658 }
659
660 #[cfg(not(feature = "e2e-encryption"))]
661 fn is_e2ee_enabled(&self) -> bool {
662 false
663 }
664
665 async fn must_process_rooms_response(&self) -> bool {
667 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
670 || !self.inner.lists.read().await.is_empty()
671 }
672
673 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
674 async fn sync_once(&self) -> Result<UpdateSummary> {
675 let (request, request_config, position_guard) =
676 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
677
678 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
680
681 self.inner.client.inner.sync_beat.notify(usize::MAX);
683
684 Ok(summaries)
685 }
686
687 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
697 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
698 debug!("Starting sync stream");
699
700 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
701
702 stream! {
703 loop {
704 debug!("Sync stream is running");
705
706 select! {
707 biased;
708
709 internal_message = internal_channel_receiver.recv() => {
710 use SlidingSyncInternalMessage::*;
711
712 debug!(?internal_message, "Sync stream has received an internal message");
713
714 match internal_message {
715 Err(_) | Ok(SyncLoopStop) => {
716 break;
717 }
718
719 Ok(SyncLoopSkipOverCurrentIteration) => {
720 continue;
721 }
722 }
723 }
724
725 update_summary = self.sync_once() => {
726 match update_summary {
727 Ok(updates) => {
728 yield Ok(updates);
729 }
730
731 Err(error) => {
733 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
734 self.expire_session().await;
736 }
737
738 yield Err(error);
739
740 break;
742 }
743 }
744 }
745 }
746 }
747
748 debug!("Sync stream has exited.");
749 }
750 }
751
752 pub fn stop_sync(&self) -> Result<()> {
761 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
762 }
763
764 #[doc(hidden)]
776 pub async fn expire_session(&self) {
777 info!("Session expired; resetting `pos` and sticky parameters");
778
779 {
780 let mut position = self.inner.position.lock().await;
781 position.pos = None;
782
783 if let Err(err) = self.cache_to_storage(&position).await {
784 error!(
785 "couldn't invalidate sliding sync frozen state when expiring session: {err}"
786 );
787 }
788 }
789
790 {
791 let mut sticky = self.inner.sticky.write().unwrap();
792
793 sticky.data_mut().room_subscriptions.clear();
796 }
797
798 self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
799 }
800}
801
802impl SlidingSyncInner {
803 #[instrument]
805 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
806 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
807 }
808
809 #[instrument]
812 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
813 let _ = self.internal_channel.send(message);
815 }
816}
817
818#[derive(Copy, Clone, Debug, PartialEq)]
819enum SlidingSyncInternalMessage {
820 SyncLoopStop,
822
823 SyncLoopSkipOverCurrentIteration,
826}
827
828#[cfg(any(test, feature = "testing"))]
829impl SlidingSync {
830 pub async fn set_pos(&self, new_pos: String) {
832 let mut position_lock = self.inner.position.lock().await;
833 position_lock.pos = Some(new_pos);
834 }
835
836 pub fn extensions_config(&self) -> http::request::Extensions {
842 let sticky = self.inner.sticky.read().unwrap();
843 sticky.data().extensions.clone()
844 }
845}
846
847#[derive(Clone, Debug)]
848pub(super) struct SlidingSyncPositionMarkers {
849 pos: Option<String>,
854}
855
856#[derive(Debug, Serialize, Deserialize)]
858struct FrozenSlidingSync {
859 #[serde(skip_serializing_if = "Option::is_none")]
861 to_device_since: Option<String>,
862 #[serde(default, skip_serializing_if = "Vec::is_empty")]
863 rooms: Vec<FrozenSlidingSyncRoom>,
864}
865
866impl FrozenSlidingSync {
867 fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
868 Self {
870 to_device_since: None,
871 rooms: rooms
872 .iter()
873 .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
874 .collect::<Vec<_>>(),
875 }
876 }
877}
878
879#[derive(Serialize, Deserialize)]
880struct FrozenSlidingSyncPos {
881 #[serde(skip_serializing_if = "Option::is_none")]
882 pos: Option<String>,
883}
884
885#[derive(Debug, Clone)]
888pub struct UpdateSummary {
889 pub lists: Vec<String>,
891 pub rooms: Vec<OwnedRoomId>,
893}
894
895#[derive(Debug, Default)]
899enum RoomSubscriptionState {
900 #[default]
904 Pending,
905
906 Applied,
909}
910
911#[derive(Debug)]
914pub(super) struct SlidingSyncStickyParameters {
915 room_subscriptions:
918 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
919
920 extensions: http::request::Extensions,
923}
924
925impl SlidingSyncStickyParameters {
926 pub fn new(
928 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
929 extensions: http::request::Extensions,
930 ) -> Self {
931 Self {
932 room_subscriptions: room_subscriptions
933 .into_iter()
934 .map(|(room_id, room_subscription)| {
935 (room_id, (RoomSubscriptionState::Pending, room_subscription))
936 })
937 .collect(),
938 extensions,
939 }
940 }
941}
942
943impl StickyData for SlidingSyncStickyParameters {
944 type Request = http::Request;
945
946 fn apply(&self, request: &mut Self::Request) {
947 request.room_subscriptions = self
948 .room_subscriptions
949 .iter()
950 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
951 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
952 .collect();
953 request.extensions = self.extensions.clone();
954 }
955
956 fn on_commit(&mut self) {
957 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
959 if matches!(state, RoomSubscriptionState::Pending) {
960 *state = RoomSubscriptionState::Applied;
961 }
962 }
963 }
964}
965
966#[cfg(all(test, not(target_family = "wasm")))]
967#[allow(clippy::dbg_macro)]
968mod tests {
969 use std::{
970 collections::BTreeMap,
971 future::ready,
972 ops::Not,
973 sync::{Arc, Mutex},
974 time::Duration,
975 };
976
977 use assert_matches::assert_matches;
978 use event_listener::Listener;
979 use futures_util::{future::join_all, pin_mut, StreamExt};
980 use matrix_sdk_base::RequestedRequiredStates;
981 use matrix_sdk_test::async_test;
982 use ruma::{
983 api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
984 OwnedRoomId, TransactionId,
985 };
986 use serde::Deserialize;
987 use serde_json::json;
988 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
989
990 use super::{
991 http,
992 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
993 FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
994 SlidingSyncStickyParameters,
995 };
996 use crate::{
997 sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
998 };
999
1000 #[derive(Copy, Clone)]
1001 struct SlidingSyncMatcher;
1002
1003 impl Match for SlidingSyncMatcher {
1004 fn matches(&self, request: &Request) -> bool {
1005 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1006 && request.method == Method::POST
1007 }
1008 }
1009
1010 async fn new_sliding_sync(
1011 lists: Vec<SlidingSyncListBuilder>,
1012 ) -> Result<(MockServer, SlidingSync)> {
1013 let server = MockServer::start().await;
1014 let client = logged_in_client(Some(server.uri())).await;
1015
1016 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1017
1018 for list in lists {
1019 sliding_sync_builder = sliding_sync_builder.add_list(list);
1020 }
1021
1022 let sliding_sync = sliding_sync_builder.build().await?;
1023
1024 Ok((server, sliding_sync))
1025 }
1026
1027 #[async_test]
1028 async fn test_subscribe_to_rooms() -> Result<()> {
1029 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1030 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1031 .await?;
1032
1033 let stream = sliding_sync.sync();
1034 pin_mut!(stream);
1035
1036 let room_id_0 = room_id!("!r0:bar.org");
1037 let room_id_1 = room_id!("!r1:bar.org");
1038 let room_id_2 = room_id!("!r2:bar.org");
1039
1040 {
1041 let _mock_guard = Mock::given(SlidingSyncMatcher)
1042 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1043 "pos": "1",
1044 "lists": {},
1045 "rooms": {
1046 room_id_0: {
1047 "name": "Room #0",
1048 "initial": true,
1049 },
1050 room_id_1: {
1051 "name": "Room #1",
1052 "initial": true,
1053 },
1054 room_id_2: {
1055 "name": "Room #2",
1056 "initial": true,
1057 },
1058 }
1059 })))
1060 .mount_as_scoped(&server)
1061 .await;
1062
1063 let _ = stream.next().await.unwrap()?;
1064 }
1065
1066 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1067
1068 assert!(room0.are_members_synced().not());
1072
1073 {
1074 struct MemberMatcher(OwnedRoomId);
1075
1076 impl Match for MemberMatcher {
1077 fn matches(&self, request: &Request) -> bool {
1078 request.url.path()
1079 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1080 && request.method == Method::GET
1081 }
1082 }
1083
1084 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1085 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1086 "chunk": [],
1087 })))
1088 .mount_as_scoped(&server)
1089 .await;
1090
1091 assert_matches!(room0.request_members().await, Ok(()));
1092 }
1093
1094 assert!(room0.are_members_synced());
1096
1097 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1098
1099 assert!(room0.are_members_synced().not());
1102
1103 {
1104 let sticky = sliding_sync.inner.sticky.read().unwrap();
1105 let room_subscriptions = &sticky.data().room_subscriptions;
1106
1107 assert!(room_subscriptions.contains_key(room_id_0));
1108 assert!(room_subscriptions.contains_key(room_id_1));
1109 assert!(!room_subscriptions.contains_key(room_id_2));
1110 }
1111
1112 {
1115 struct MemberMatcher(OwnedRoomId);
1116
1117 impl Match for MemberMatcher {
1118 fn matches(&self, request: &Request) -> bool {
1119 request.url.path()
1120 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1121 && request.method == Method::GET
1122 }
1123 }
1124
1125 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1126 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1127 "chunk": [],
1128 })))
1129 .mount_as_scoped(&server)
1130 .await;
1131
1132 assert_matches!(room0.request_members().await, Ok(()));
1133 }
1134
1135 assert!(room0.are_members_synced());
1137
1138 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1139
1140 assert!(room0.are_members_synced());
1143
1144 Ok(())
1145 }
1146
1147 #[async_test]
1148 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1149 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1150 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1151 .await?;
1152
1153 let room_id_0 = room_id!("!r0:bar.org");
1154 let room_id_1 = room_id!("!r1:bar.org");
1155 let room_id_2 = room_id!("!r2:bar.org");
1156
1157 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1159
1160 {
1161 let sticky = sliding_sync.inner.sticky.read().unwrap();
1162 let room_subscriptions = &sticky.data().room_subscriptions;
1163
1164 assert!(room_subscriptions.contains_key(room_id_0));
1165 assert!(room_subscriptions.contains_key(room_id_1));
1166 assert!(room_subscriptions.contains_key(room_id_2).not());
1167 }
1168
1169 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1171
1172 {
1173 let sticky = sliding_sync.inner.sticky.read().unwrap();
1174 let room_subscriptions = &sticky.data().room_subscriptions;
1175
1176 assert!(room_subscriptions.contains_key(room_id_0));
1177 assert!(room_subscriptions.contains_key(room_id_1));
1178 assert!(room_subscriptions.contains_key(room_id_2));
1179 }
1180
1181 sliding_sync.expire_session().await;
1183
1184 {
1185 let sticky = sliding_sync.inner.sticky.read().unwrap();
1186 let room_subscriptions = &sticky.data().room_subscriptions;
1187
1188 assert!(room_subscriptions.is_empty());
1189 }
1190
1191 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1193
1194 {
1195 let sticky = sliding_sync.inner.sticky.read().unwrap();
1196 let room_subscriptions = &sticky.data().room_subscriptions;
1197
1198 assert!(room_subscriptions.contains_key(room_id_0).not());
1199 assert!(room_subscriptions.contains_key(room_id_1).not());
1200 assert!(room_subscriptions.contains_key(room_id_2));
1201 }
1202
1203 Ok(())
1204 }
1205
1206 #[async_test]
1207 async fn test_to_device_token_properly_cached() -> Result<()> {
1208 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1209 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1210 .await?;
1211
1212 let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1215 assert!(frozen.to_device_since.is_none());
1216
1217 Ok(())
1218 }
1219
1220 #[async_test]
1221 async fn test_add_list() -> Result<()> {
1222 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1223 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1224 .await?;
1225
1226 let _stream = sliding_sync.sync();
1227 pin_mut!(_stream);
1228
1229 sliding_sync
1230 .add_list(
1231 SlidingSyncList::builder("bar")
1232 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1233 )
1234 .await?;
1235
1236 let lists = sliding_sync.inner.lists.read().await;
1237
1238 assert!(lists.contains_key("foo"));
1239 assert!(lists.contains_key("bar"));
1240
1241 Ok(())
1244 }
1245
1246 #[test]
1247 fn test_sticky_parameters_api_invalidated_flow() {
1248 let r0 = room_id!("!r0.matrix.org");
1249 let r1 = room_id!("!r1:matrix.org");
1250
1251 let mut room_subscriptions = BTreeMap::new();
1252 room_subscriptions.insert(r0.to_owned(), Default::default());
1253
1254 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1256 room_subscriptions,
1257 Default::default(),
1258 ));
1259 assert!(sticky.is_invalidated());
1260
1261 let txn_id: &TransactionId = "tid123".into();
1263
1264 let mut request = http::Request::default();
1265 request.txn_id = Some(txn_id.to_string());
1266
1267 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1268
1269 assert!(request.txn_id.is_some());
1270 assert_eq!(request.room_subscriptions.len(), 1);
1271 assert!(request.room_subscriptions.contains_key(r0));
1272
1273 let tid = request.txn_id.unwrap();
1274
1275 sticky.maybe_commit(tid.as_str().into());
1276 assert!(!sticky.is_invalidated());
1277
1278 sticky
1280 .data_mut()
1281 .room_subscriptions
1282 .insert(r1.to_owned(), (Default::default(), Default::default()));
1283 assert!(sticky.is_invalidated());
1284
1285 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1287 assert!(sticky.is_invalidated());
1288
1289 let txn_id1: &TransactionId = "tid456".into();
1291 let mut request1 = http::Request::default();
1292 request1.txn_id = Some(txn_id1.to_string());
1293 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1294
1295 assert!(sticky.is_invalidated());
1296 assert_eq!(request1.room_subscriptions.len(), 1);
1300 assert!(request1.room_subscriptions.contains_key(r1));
1301
1302 let txn_id2: &TransactionId = "tid789".into();
1303 let mut request2 = http::Request::default();
1304 request2.txn_id = Some(txn_id2.to_string());
1305
1306 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1307 assert!(sticky.is_invalidated());
1308 assert_eq!(request2.room_subscriptions.len(), 1);
1311 assert!(request2.room_subscriptions.contains_key(r1));
1312
1313 sticky.maybe_commit(txn_id1);
1316 assert!(sticky.is_invalidated());
1317
1318 sticky.maybe_commit(txn_id2);
1320 assert!(!sticky.is_invalidated());
1321 }
1322
1323 #[test]
1324 fn test_room_subscriptions_are_sticky() {
1325 let r0 = room_id!("!r0.matrix.org");
1326 let r1 = room_id!("!r1:matrix.org");
1327
1328 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1329 BTreeMap::new(),
1330 Default::default(),
1331 ));
1332
1333 {
1335 sticky
1337 .data_mut()
1338 .room_subscriptions
1339 .insert(r0.to_owned(), (Default::default(), Default::default()));
1340
1341 let txn_id: &TransactionId = "tid0".into();
1343 let mut request = http::Request::default();
1344 request.txn_id = Some(txn_id.to_string());
1345
1346 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1347
1348 assert!(request.txn_id.is_some());
1349 assert_eq!(request.room_subscriptions.len(), 1);
1350 assert!(request.room_subscriptions.contains_key(r0));
1351
1352 let tid = request.txn_id.unwrap();
1354
1355 sticky.maybe_commit(tid.as_str().into());
1356 }
1357
1358 {
1360 sticky
1362 .data_mut()
1363 .room_subscriptions
1364 .insert(r1.to_owned(), (Default::default(), Default::default()));
1365
1366 let txn_id: &TransactionId = "tid1".into();
1368 let mut request = http::Request::default();
1369 request.txn_id = Some(txn_id.to_string());
1370
1371 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1372
1373 assert!(request.txn_id.is_some());
1374 assert_eq!(request.room_subscriptions.len(), 1);
1375 assert!(request.room_subscriptions.contains_key(r1));
1377
1378 }
1382
1383 {
1385 let txn_id: &TransactionId = "tid2".into();
1387 let mut request = http::Request::default();
1388 request.txn_id = Some(txn_id.to_string());
1389
1390 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1391
1392 assert!(request.txn_id.is_some());
1393 assert_eq!(request.room_subscriptions.len(), 1);
1394 assert!(request.room_subscriptions.contains_key(r1));
1396
1397 let tid = request.txn_id.unwrap();
1399
1400 sticky.maybe_commit(tid.as_str().into());
1401 }
1402
1403 {
1405 let txn_id: &TransactionId = "tid3".into();
1407 let mut request = http::Request::default();
1408 request.txn_id = Some(txn_id.to_string());
1409
1410 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1411
1412 assert!(request.txn_id.is_some());
1413 assert!(request.room_subscriptions.is_empty());
1415 }
1416 }
1417
1418 #[test]
1419 fn test_extensions_are_sticky() {
1420 let mut extensions = http::request::Extensions::default();
1421 extensions.account_data.enabled = Some(true);
1422
1423 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1425 Default::default(),
1426 extensions,
1427 ));
1428
1429 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1430
1431 let extensions = &sticky.data().extensions;
1434 assert_eq!(extensions.e2ee.enabled, None);
1435 assert_eq!(extensions.to_device.enabled, None);
1436 assert_eq!(extensions.to_device.since, None);
1437
1438 assert_eq!(extensions.account_data.enabled, Some(true));
1440
1441 let txn_id: &TransactionId = "tid123".into();
1442 let mut request = http::Request::default();
1443 request.txn_id = Some(txn_id.to_string());
1444 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1445 assert!(sticky.is_invalidated());
1446 assert_eq!(request.extensions.to_device.enabled, None);
1447 assert_eq!(request.extensions.to_device.since, None);
1448 assert_eq!(request.extensions.e2ee.enabled, None);
1449 assert_eq!(request.extensions.account_data.enabled, Some(true));
1450 }
1451
1452 #[async_test]
1453 async fn test_sticky_extensions_plus_since() -> Result<()> {
1454 let server = MockServer::start().await;
1455 let client = logged_in_client(Some(server.uri())).await;
1456
1457 let sync = client
1458 .sliding_sync("test-slidingsync")?
1459 .add_list(SlidingSyncList::builder("new_list"))
1460 .build()
1461 .await?;
1462
1463 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1465 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1466 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1467
1468 let sync = client
1470 .sliding_sync("test-slidingsync")?
1471 .add_list(SlidingSyncList::builder("new_list"))
1472 .with_to_device_extension(
1473 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1474 )
1475 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1476 .build()
1477 .await?;
1478
1479 let txn_id = TransactionId::new();
1482 let (request, _, _) = sync
1483 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1484 .await?;
1485
1486 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1487 assert_eq!(request.extensions.to_device.enabled, Some(true));
1488 assert!(request.extensions.to_device.since.is_none());
1489
1490 {
1491 let mut sticky = sync.inner.sticky.write().unwrap();
1493 assert!(sticky.is_invalidated());
1494 sticky.maybe_commit(
1495 "hopefully the rng won't generate this very specific transaction id".into(),
1496 );
1497 assert!(sticky.is_invalidated());
1498 }
1499
1500 let txn_id2 = TransactionId::new();
1502 let (request, _, _) = sync
1503 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1504 .await?;
1505
1506 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1507 assert_eq!(request.extensions.to_device.enabled, Some(true));
1508 assert!(request.extensions.to_device.since.is_none());
1509
1510 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1511
1512 {
1513 let mut sticky = sync.inner.sticky.write().unwrap();
1515 assert!(sticky.is_invalidated());
1516 sticky.maybe_commit(txn_id2.as_str().into());
1517 assert!(!sticky.is_invalidated());
1518 }
1519
1520 let txn_id = TransactionId::new();
1522 let (request, _, _) = sync
1523 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1524 .await?;
1525 assert!(request.extensions.e2ee.enabled.is_none());
1526 assert!(request.extensions.to_device.enabled.is_none());
1527 assert!(request.extensions.to_device.since.is_none());
1528
1529 let _since_token = "since";
1533
1534 #[cfg(feature = "e2e-encryption")]
1535 {
1536 use matrix_sdk_base::crypto::store::Changes;
1537 if let Some(olm_machine) = &*client.olm_machine().await {
1538 olm_machine
1539 .store()
1540 .save_changes(Changes {
1541 next_batch_token: Some(_since_token.to_owned()),
1542 ..Default::default()
1543 })
1544 .await?;
1545 }
1546 }
1547
1548 let txn_id = TransactionId::new();
1549 let (request, _, _) = sync
1550 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1551 .await?;
1552
1553 assert!(request.extensions.e2ee.enabled.is_none());
1554 assert!(request.extensions.to_device.enabled.is_none());
1555
1556 #[cfg(feature = "e2e-encryption")]
1557 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1558
1559 Ok(())
1560 }
1561
1562 #[async_test]
1568 #[cfg(feature = "e2e-encryption")]
1569 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1570 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1571 use matrix_sdk_test::ruma_response_from_json;
1572 use ruma::user_id;
1573
1574 let server = MockServer::start().await;
1575 let client = logged_in_client(Some(server.uri())).await;
1576
1577 let alice = user_id!("@alice:localhost");
1578 let bob = user_id!("@bob:localhost");
1579 let me = user_id!("@example:localhost");
1580
1581 {
1584 let olm_machine = client.olm_machine().await;
1585 let olm_machine = olm_machine.as_ref().unwrap();
1586
1587 olm_machine.update_tracked_users([alice, bob]).await?;
1588
1589 let outgoing_requests = olm_machine.outgoing_requests().await?;
1591
1592 assert_eq!(outgoing_requests.len(), 2);
1593 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1594 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1595
1596 olm_machine
1598 .mark_request_as_sent(
1599 outgoing_requests[0].request_id(),
1600 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1601 "one_time_key_counts": {}
1602 }))),
1603 )
1604 .await?;
1605
1606 olm_machine
1607 .mark_request_as_sent(
1608 outgoing_requests[1].request_id(),
1609 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1610 "device_keys": {
1611 alice: {},
1612 bob: {},
1613 }
1614 }))),
1615 )
1616 .await?;
1617
1618 let outgoing_requests = olm_machine.outgoing_requests().await?;
1620
1621 assert_eq!(outgoing_requests.len(), 1);
1622 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1623
1624 olm_machine
1625 .mark_request_as_sent(
1626 outgoing_requests[0].request_id(),
1627 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1628 "device_keys": {
1629 me: {},
1630 }
1631 }))),
1632 )
1633 .await?;
1634
1635 let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638 assert!(outgoing_requests.is_empty());
1639 }
1640
1641 let sync = client
1642 .sliding_sync("test-slidingsync")?
1643 .add_list(SlidingSyncList::builder("new_list"))
1644 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1645 .build()
1646 .await?;
1647
1648 let txn_id = TransactionId::new();
1650 let (_request, _, _) = sync
1651 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1652 .await?;
1653
1654 {
1656 let olm_machine = client.olm_machine().await;
1657 let olm_machine = olm_machine.as_ref().unwrap();
1658
1659 let outgoing_requests = olm_machine.outgoing_requests().await?;
1661
1662 assert_eq!(outgoing_requests.len(), 1);
1663 assert_matches!(
1664 outgoing_requests[0].request(),
1665 AnyOutgoingRequest::KeysQuery(request) => {
1666 assert!(request.device_keys.contains_key(alice));
1667 assert!(request.device_keys.contains_key(bob));
1668 assert!(request.device_keys.contains_key(me));
1669 }
1670 );
1671
1672 olm_machine
1674 .mark_request_as_sent(
1675 outgoing_requests[0].request_id(),
1676 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1677 "device_keys": {
1678 alice: {},
1679 bob: {},
1680 me: {},
1681 }
1682 }))),
1683 )
1684 .await?;
1685 }
1686
1687 sync.set_pos("chocolat".to_owned()).await;
1689
1690 let txn_id = TransactionId::new();
1691 let (_request, _, _) = sync
1692 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1693 .await?;
1694
1695 {
1697 let olm_machine = client.olm_machine().await;
1698 let olm_machine = olm_machine.as_ref().unwrap();
1699
1700 let outgoing_requests = olm_machine.outgoing_requests().await?;
1702
1703 assert!(outgoing_requests.is_empty());
1704 }
1705
1706 Ok(())
1707 }
1708
1709 #[async_test]
1710 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1711 let server = MockServer::start().await;
1712 let client = logged_in_client(Some(server.uri())).await;
1713
1714 let sliding_sync = client
1715 .sliding_sync("test-slidingsync")?
1716 .with_to_device_extension(
1717 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1718 )
1719 .build()
1720 .await?;
1721
1722 let (request, _, _) =
1724 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1725 assert!(request.extensions.to_device.enabled.is_some());
1726
1727 let sync = sliding_sync.sync();
1728 pin_mut!(sync);
1729
1730 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1732
1733 #[derive(Deserialize)]
1734 struct PartialRequest {
1735 txn_id: Option<String>,
1736 }
1737
1738 {
1739 let _mock_guard = Mock::given(SlidingSyncMatcher)
1740 .respond_with(|request: &Request| {
1741 let request: PartialRequest = request.body_json().unwrap();
1743
1744 ResponseTemplate::new(200).set_body_json(json!({
1745 "txn_id": request.txn_id,
1746 "pos": "0",
1747 }))
1748 })
1749 .mount_as_scoped(&server)
1750 .await;
1751
1752 let next = sync.next().await;
1753 assert_matches!(next, Some(Ok(_update_summary)));
1754
1755 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1757 }
1758
1759 let (request, _, _) =
1761 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1762 assert!(request.extensions.to_device.enabled.is_none());
1763
1764 {
1766 let _mock_guard = Mock::given(SlidingSyncMatcher)
1767 .respond_with(|request: &Request| {
1768 let request: PartialRequest = request.body_json().unwrap();
1770
1771 ResponseTemplate::new(200).set_body_json(json!({
1772 "txn_id": request.txn_id,
1773 "pos": "1",
1774 }))
1775 })
1776 .mount_as_scoped(&server)
1777 .await;
1778
1779 let next = sync.next().await;
1780 assert_matches!(next, Some(Ok(_update_summary)));
1781
1782 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1784 }
1785
1786 {
1789 let _mock_guard = Mock::given(SlidingSyncMatcher)
1790 .respond_with(|request: &Request| {
1791 let request: PartialRequest = request.body_json().unwrap();
1793
1794 ResponseTemplate::new(200).set_body_json(json!({
1795 "txn_id": request.txn_id,
1796 "pos": "0", }))
1798 })
1799 .up_to_n_times(1) .mount_as_scoped(&server)
1801 .await;
1802
1803 let next = sync.next().await;
1804 assert_matches!(next, Some(Ok(_update_summary)));
1805
1806 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1808 }
1809
1810 {
1815 let _mock_guard = Mock::given(SlidingSyncMatcher)
1816 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1817 "error": "foo",
1818 "errcode": "M_UNKNOWN_POS",
1819 })))
1820 .mount_as_scoped(&server)
1821 .await;
1822
1823 let next = sync.next().await;
1824
1825 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1827
1828 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1830
1831 let (request, _, _) =
1833 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1834
1835 assert!(request.extensions.to_device.enabled.is_some());
1836
1837 assert!(sync.next().await.is_none());
1839 }
1840
1841 Ok(())
1842 }
1843
1844 #[cfg(feature = "e2e-encryption")]
1845 #[async_test]
1846 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1847 let server = MockServer::start().await;
1848
1849 #[derive(Deserialize)]
1850 struct PartialRequest {
1851 txn_id: Option<String>,
1852 }
1853
1854 let server_pos = Arc::new(Mutex::new(0));
1855 let _mock_guard = Mock::given(SlidingSyncMatcher)
1856 .respond_with(move |request: &Request| {
1857 let request: PartialRequest = request.body_json().unwrap();
1859 let pos = {
1860 let mut pos = server_pos.lock().unwrap();
1861 let prev = *pos;
1862 *pos += 1;
1863 prev
1864 };
1865
1866 ResponseTemplate::new(200).set_body_json(json!({
1867 "txn_id": request.txn_id,
1868 "pos": pos.to_string(),
1869 }))
1870 })
1871 .mount_as_scoped(&server)
1872 .await;
1873
1874 let client = logged_in_client(Some(server.uri())).await;
1875
1876 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1877
1878 {
1880 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1881
1882 let (request, _, _) =
1883 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1884 assert!(request.pos.is_none());
1885 }
1886
1887 let sync = sliding_sync.sync();
1888 pin_mut!(sync);
1889
1890 let next = sync.next().await;
1893 assert_matches!(next, Some(Ok(_update_summary)));
1894
1895 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896
1897 let restored_fields = restore_sliding_sync_state(
1898 &client,
1899 &sliding_sync.inner.storage_key,
1900 &*sliding_sync.inner.lists.read().await,
1901 )
1902 .await?
1903 .expect("must have restored fields");
1904
1905 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1908
1909 {
1913 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1914
1915 let mut position_guard = other_sync.inner.position.lock().await;
1916 position_guard.pos = Some("yolo".to_owned());
1917
1918 other_sync.cache_to_storage(&position_guard).await?;
1919 }
1920
1921 {
1923 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1924 let (request, _, _) =
1925 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1926 assert_eq!(request.pos.as_deref(), Some("0"));
1927 }
1928
1929 {
1932 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1933 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1934 }
1935
1936 Ok(())
1937 }
1938
1939 #[cfg(feature = "e2e-encryption")]
1940 #[async_test]
1941 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1942 let server = MockServer::start().await;
1943
1944 #[derive(Deserialize)]
1945 struct PartialRequest {
1946 txn_id: Option<String>,
1947 }
1948
1949 let server_pos = Arc::new(Mutex::new(0));
1950 let _mock_guard = Mock::given(SlidingSyncMatcher)
1951 .respond_with(move |request: &Request| {
1952 let request: PartialRequest = request.body_json().unwrap();
1954 let pos = {
1955 let mut pos = server_pos.lock().unwrap();
1956 let prev = *pos;
1957 *pos += 1;
1958 prev
1959 };
1960
1961 ResponseTemplate::new(200).set_body_json(json!({
1962 "txn_id": request.txn_id,
1963 "pos": pos.to_string(),
1964 }))
1965 })
1966 .mount_as_scoped(&server)
1967 .await;
1968
1969 let client = logged_in_client(Some(server.uri())).await;
1970
1971 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1972
1973 {
1975 let (request, _, _) =
1976 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1977
1978 assert!(request.pos.is_none());
1979 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1980 }
1981
1982 let sync = sliding_sync.sync();
1983 pin_mut!(sync);
1984
1985 let next = sync.next().await;
1988 assert_matches!(next, Some(Ok(_update_summary)));
1989
1990 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1991
1992 let restored_fields = restore_sliding_sync_state(
1993 &client,
1994 &sliding_sync.inner.storage_key,
1995 &*sliding_sync.inner.lists.read().await,
1996 )
1997 .await?
1998 .expect("must have restored fields");
1999
2000 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2003
2004 {
2006 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2007
2008 let mut position_guard = other_sync.inner.position.lock().await;
2009 position_guard.pos = Some("42".to_owned());
2010
2011 other_sync.cache_to_storage(&position_guard).await?;
2012 }
2013
2014 {
2016 let (request, _, _) =
2017 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2018 assert_eq!(request.pos.as_deref(), Some("42"));
2019 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2020 }
2021
2022 {
2024 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2025 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2026
2027 let (request, _, _) =
2028 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2029 assert_eq!(request.pos.as_deref(), Some("42"));
2030 }
2031
2032 sliding_sync.expire_session().await;
2035
2036 {
2037 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2038
2039 let (request, _, _) =
2040 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2041 assert!(request.pos.is_none());
2042 }
2043
2044 {
2046 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2047 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2048
2049 let (request, _, _) =
2050 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2051 assert!(request.pos.is_none());
2052 }
2053
2054 Ok(())
2055 }
2056
2057 #[async_test]
2058 async fn test_stop_sync_loop() -> Result<()> {
2059 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2060 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2061 .await?;
2062
2063 let stream = sliding_sync.sync();
2065 pin_mut!(stream);
2066
2067 assert!(stream.next().await.is_some());
2069
2070 sliding_sync.stop_sync()?;
2072
2073 assert!(stream.next().await.is_none());
2075
2076 let stream = sliding_sync.sync();
2078 pin_mut!(stream);
2079
2080 assert!(stream.next().await.is_some());
2082
2083 Ok(())
2084 }
2085
2086 #[async_test]
2087 async fn test_process_read_receipts() -> Result<()> {
2088 let room = owned_room_id!("!pony:example.org");
2089
2090 let server = MockServer::start().await;
2091 let client = logged_in_client(Some(server.uri())).await;
2092
2093 let sliding_sync = client
2094 .sliding_sync("test")?
2095 .with_receipt_extension(
2096 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2097 )
2098 .add_list(
2099 SlidingSyncList::builder("all")
2100 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2101 )
2102 .build()
2103 .await?;
2104
2105 {
2107 let server_response = assign!(http::Response::new("0".to_owned()), {
2108 rooms: BTreeMap::from([(
2109 room.clone(),
2110 http::response::Room::default(),
2111 )])
2112 });
2113
2114 let _summary = {
2115 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2116 sliding_sync
2117 .handle_response(
2118 server_response.clone(),
2119 &mut pos_guard,
2120 RequestedRequiredStates::default(),
2121 )
2122 .await?
2123 };
2124 }
2125
2126 let server_response = assign!(http::Response::new("1".to_owned()), {
2127 extensions: assign!(http::response::Extensions::default(), {
2128 receipts: assign!(http::response::Receipts::default(), {
2129 rooms: BTreeMap::from([
2130 (
2131 room.clone(),
2132 Raw::from_json_string(
2133 json!({
2134 "room_id": room,
2135 "type": "m.receipt",
2136 "content": {
2137 "$event:bar.org": {
2138 "m.read": {
2139 client.user_id().unwrap(): {
2140 "ts": 1436451550,
2141 }
2142 }
2143 }
2144 }
2145 })
2146 .to_string(),
2147 ).unwrap()
2148 )
2149 ])
2150 })
2151 })
2152 });
2153
2154 let summary = {
2155 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2156 sliding_sync
2157 .handle_response(
2158 server_response.clone(),
2159 &mut pos_guard,
2160 RequestedRequiredStates::default(),
2161 )
2162 .await?
2163 };
2164
2165 assert!(summary.rooms.contains(&room));
2166
2167 Ok(())
2168 }
2169
2170 #[async_test]
2171 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2172 let room_id = owned_room_id!("!unicorn:example.org");
2173
2174 let server = MockServer::start().await;
2175 let client = logged_in_client(Some(server.uri())).await;
2176
2177 let sliding_sync = client
2180 .sliding_sync("test")?
2181 .with_account_data_extension(
2182 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2183 )
2184 .add_list(
2185 SlidingSyncList::builder("all")
2186 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2187 )
2188 .build()
2189 .await?;
2190
2191 {
2193 let server_response = assign!(http::Response::new("0".to_owned()), {
2194 rooms: BTreeMap::from([(
2195 room_id.clone(),
2196 http::response::Room::default(),
2197 )])
2198 });
2199
2200 let _summary = {
2201 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2202 sliding_sync
2203 .handle_response(
2204 server_response.clone(),
2205 &mut pos_guard,
2206 RequestedRequiredStates::default(),
2207 )
2208 .await?
2209 };
2210 }
2211
2212 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2216
2217 let update_summary = {
2218 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2219 sliding_sync
2220 .handle_response(
2221 server_response.clone(),
2222 &mut pos_guard,
2223 RequestedRequiredStates::default(),
2224 )
2225 .await?
2226 };
2227
2228 assert!(update_summary.rooms.contains(&room_id));
2231
2232 let room = client.get_room(&room_id).unwrap();
2233
2234 assert!(room.is_marked_unread());
2237
2238 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2241
2242 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2243 sliding_sync
2244 .handle_response(
2245 server_response.clone(),
2246 &mut pos_guard,
2247 RequestedRequiredStates::default(),
2248 )
2249 .await?;
2250
2251 let room = client.get_room(&room_id).unwrap();
2252
2253 assert!(!room.is_marked_unread());
2254
2255 Ok(())
2256 }
2257
2258 fn make_mark_unread_response(
2259 response_number: &str,
2260 room_id: OwnedRoomId,
2261 unread: bool,
2262 add_rooms_section: bool,
2263 ) -> http::Response {
2264 let rooms = if add_rooms_section {
2265 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2266 } else {
2267 BTreeMap::new()
2268 };
2269
2270 let extensions = assign!(http::response::Extensions::default(), {
2271 account_data: assign!(http::response::AccountData::default(), {
2272 rooms: BTreeMap::from([
2273 (
2274 room_id,
2275 vec![
2276 Raw::from_json_string(
2277 json!({
2278 "content": {
2279 "unread": unread
2280 },
2281 "type": "com.famedly.marked_unread"
2282 })
2283 .to_string(),
2284 ).unwrap()
2285 ]
2286 )
2287 ])
2288 })
2289 });
2290
2291 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2292 }
2293
2294 #[async_test]
2295 async fn test_process_rooms_account_data() -> Result<()> {
2296 let room = owned_room_id!("!pony:example.org");
2297
2298 let server = MockServer::start().await;
2299 let client = logged_in_client(Some(server.uri())).await;
2300
2301 let sliding_sync = client
2302 .sliding_sync("test")?
2303 .with_account_data_extension(
2304 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2305 )
2306 .add_list(
2307 SlidingSyncList::builder("all")
2308 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2309 )
2310 .build()
2311 .await?;
2312
2313 {
2315 let server_response = assign!(http::Response::new("0".to_owned()), {
2316 rooms: BTreeMap::from([(
2317 room.clone(),
2318 http::response::Room::default(),
2319 )])
2320 });
2321
2322 let _summary = {
2323 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2324 sliding_sync
2325 .handle_response(
2326 server_response.clone(),
2327 &mut pos_guard,
2328 RequestedRequiredStates::default(),
2329 )
2330 .await?
2331 };
2332 }
2333
2334 let server_response = assign!(http::Response::new("1".to_owned()), {
2335 extensions: assign!(http::response::Extensions::default(), {
2336 account_data: assign!(http::response::AccountData::default(), {
2337 rooms: BTreeMap::from([
2338 (
2339 room.clone(),
2340 vec![
2341 Raw::from_json_string(
2342 json!({
2343 "content": {
2344 "tags": {
2345 "u.work": {
2346 "order": 0.9
2347 }
2348 }
2349 },
2350 "type": "m.tag"
2351 })
2352 .to_string(),
2353 ).unwrap()
2354 ]
2355 )
2356 ])
2357 })
2358 })
2359 });
2360 let summary = {
2361 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2362 sliding_sync
2363 .handle_response(
2364 server_response.clone(),
2365 &mut pos_guard,
2366 RequestedRequiredStates::default(),
2367 )
2368 .await?
2369 };
2370
2371 assert!(summary.rooms.contains(&room));
2372
2373 Ok(())
2374 }
2375
2376 #[async_test]
2377 #[cfg(feature = "e2e-encryption")]
2378 async fn test_process_only_encryption_events() -> Result<()> {
2379 use ruma::OneTimeKeyAlgorithm;
2380
2381 let room = owned_room_id!("!croissant:example.org");
2382
2383 let server = MockServer::start().await;
2384 let client = logged_in_client(Some(server.uri())).await;
2385
2386 let server_response = assign!(http::Response::new("0".to_owned()), {
2387 rooms: BTreeMap::from([(
2388 room.clone(),
2389 assign!(http::response::Room::default(), {
2390 name: Some("Croissants lovers".to_owned()),
2391 timeline: Vec::new(),
2392 }),
2393 )]),
2394
2395 extensions: assign!(http::response::Extensions::default(), {
2396 e2ee: assign!(http::response::E2EE::default(), {
2397 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2398 }),
2399 to_device: Some(assign!(http::response::ToDevice::default(), {
2400 next_batch: "to-device-token".to_owned(),
2401 })),
2402 })
2403 });
2404
2405 let sliding_sync = client
2409 .sliding_sync("test")?
2410 .with_to_device_extension(
2411 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2412 )
2413 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2414 .build()
2415 .await?;
2416
2417 {
2418 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2419
2420 sliding_sync
2421 .handle_response(
2422 server_response.clone(),
2423 &mut position_guard,
2424 RequestedRequiredStates::default(),
2425 )
2426 .await?;
2427 }
2428
2429 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2431 assert_eq!(uploaded_key_count, 42);
2432
2433 {
2434 let olm_machine = &*client.olm_machine_for_testing().await;
2435 assert_eq!(
2436 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2437 Some("to-device-token")
2438 );
2439 }
2440
2441 assert!(client.get_room(&room).is_none());
2443
2444 let client = logged_in_client(Some(server.uri())).await;
2447
2448 let sliding_sync = client
2449 .sliding_sync("test")?
2450 .add_list(SlidingSyncList::builder("thelist"))
2451 .build()
2452 .await?;
2453
2454 {
2455 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2456
2457 sliding_sync
2458 .handle_response(
2459 server_response.clone(),
2460 &mut position_guard,
2461 RequestedRequiredStates::default(),
2462 )
2463 .await?;
2464 }
2465
2466 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2468 assert_eq!(uploaded_key_count, 0);
2469
2470 {
2471 let olm_machine = &*client.olm_machine_for_testing().await;
2472 assert_eq!(
2473 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2474 None
2475 );
2476 }
2477
2478 assert!(client.get_room(&room).is_some());
2480
2481 let client = logged_in_client(Some(server.uri())).await;
2483
2484 let sliding_sync = client
2485 .sliding_sync("test")?
2486 .add_list(SlidingSyncList::builder("thelist"))
2487 .with_to_device_extension(
2488 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2489 )
2490 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2491 .build()
2492 .await?;
2493
2494 {
2495 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2496
2497 sliding_sync
2498 .handle_response(
2499 server_response.clone(),
2500 &mut position_guard,
2501 RequestedRequiredStates::default(),
2502 )
2503 .await?;
2504 }
2505
2506 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2508 assert_eq!(uploaded_key_count, 42);
2509
2510 {
2511 let olm_machine = &*client.olm_machine_for_testing().await;
2512 assert_eq!(
2513 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2514 Some("to-device-token")
2515 );
2516 }
2517
2518 assert!(client.get_room(&room).is_some());
2520
2521 Ok(())
2522 }
2523
2524 #[async_test]
2525 async fn test_lock_multiple_requests() -> Result<()> {
2526 let server = MockServer::start().await;
2527 let client = logged_in_client(Some(server.uri())).await;
2528
2529 let pos = Arc::new(Mutex::new(0));
2530 let _mock_guard = Mock::given(SlidingSyncMatcher)
2531 .respond_with(move |_: &Request| {
2532 let mut pos = pos.lock().unwrap();
2533 *pos += 1;
2534 ResponseTemplate::new(200).set_body_json(json!({
2535 "pos": pos.to_string(),
2536 "lists": {},
2537 "rooms": {}
2538 }))
2539 })
2540 .mount_as_scoped(&server)
2541 .await;
2542
2543 let sliding_sync = client
2544 .sliding_sync("test")?
2545 .with_to_device_extension(
2546 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2547 )
2548 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2549 .build()
2550 .await?;
2551
2552 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2555
2556 for result in requests.await {
2557 result?;
2558 }
2559
2560 Ok(())
2561 }
2562
2563 #[async_test]
2564 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2565 let server = MockServer::start().await;
2566 let client = logged_in_client(Some(server.uri())).await;
2567
2568 let pos = Arc::new(Mutex::new(0));
2569 let _mock_guard = Mock::given(SlidingSyncMatcher)
2570 .respond_with(move |_: &Request| {
2571 let mut pos = pos.lock().unwrap();
2572 *pos += 1;
2573 ResponseTemplate::new(200)
2575 .set_body_json(json!({
2576 "pos": pos.to_string(),
2577 "lists": {},
2578 "rooms": {}
2579 }))
2580 .set_delay(Duration::from_secs(2))
2581 })
2582 .mount_as_scoped(&server)
2583 .await;
2584
2585 let sliding_sync =
2586 client
2587 .sliding_sync("test")?
2588 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2589 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2590 ))
2591 .add_list(
2592 SlidingSyncList::builder("another-list")
2593 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2594 )
2595 .build()
2596 .await?;
2597
2598 let stream = sliding_sync.sync();
2599 pin_mut!(stream);
2600
2601 let cloned_sync = sliding_sync.clone();
2602 tokio::spawn(async move {
2603 tokio::time::sleep(Duration::from_millis(100)).await;
2604
2605 cloned_sync
2606 .on_list("another-list", |list| {
2607 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2608 ready(())
2609 })
2610 .await;
2611 });
2612
2613 assert_matches!(stream.next().await, Some(Ok(_)));
2614
2615 sliding_sync.stop_sync().unwrap();
2616
2617 assert_matches!(stream.next().await, None);
2618
2619 let mut num_requests = 0;
2620
2621 for request in server.received_requests().await.unwrap() {
2622 if !SlidingSyncMatcher.matches(&request) {
2623 continue;
2624 }
2625
2626 let another_list_ranges = if num_requests == 0 {
2627 json!([[0, 10]])
2629 } else {
2630 json!([[10, 20]])
2632 };
2633
2634 num_requests += 1;
2635 assert!(num_requests <= 2, "more than one request hit the server");
2636
2637 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2638
2639 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2640 &json_value,
2641 &json!({
2642 "conn_id": "test",
2643 "lists": {
2644 "room-list": {
2645 "ranges": [[0, 9]],
2646 "required_state": [
2647 ["m.room.encryption", ""],
2648 ["m.room.tombstone", ""]
2649 ],
2650 },
2651 "another-list": {
2652 "ranges": another_list_ranges,
2653 "required_state": [
2654 ["m.room.encryption", ""],
2655 ["m.room.tombstone", ""]
2656 ],
2657 },
2658 }
2659 }),
2660 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2661 ) {
2662 dbg!(json_value);
2663 panic!("json differ: {err}");
2664 }
2665 }
2666
2667 assert_eq!(num_requests, 2);
2668
2669 Ok(())
2670 }
2671
2672 #[async_test]
2673 async fn test_timeout_zero_list() -> Result<()> {
2674 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2675
2676 let (request, _, _) =
2677 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2678
2679 assert!(request.timeout.is_some());
2682
2683 Ok(())
2684 }
2685
2686 #[async_test]
2687 async fn test_timeout_one_list() -> Result<()> {
2688 let (_server, sliding_sync) = new_sliding_sync(vec![
2689 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2690 ])
2691 .await?;
2692
2693 let (request, _, _) =
2694 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2695
2696 assert!(request.timeout.is_none());
2698
2699 {
2701 let server_response = assign!(http::Response::new("0".to_owned()), {
2702 lists: BTreeMap::from([(
2703 "foo".to_owned(),
2704 assign!(http::response::List::default(), {
2705 count: uint!(7),
2706 })
2707 )])
2708 });
2709
2710 let _summary = {
2711 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2712 sliding_sync
2713 .handle_response(
2714 server_response.clone(),
2715 &mut pos_guard,
2716 RequestedRequiredStates::default(),
2717 )
2718 .await?
2719 };
2720 }
2721
2722 let (request, _, _) =
2723 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2724
2725 assert!(request.timeout.is_some());
2727
2728 Ok(())
2729 }
2730
2731 #[async_test]
2732 async fn test_timeout_three_lists() -> Result<()> {
2733 let (_server, sliding_sync) = new_sliding_sync(vec![
2734 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2735 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2736 SlidingSyncList::builder("baz")
2737 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2738 ])
2739 .await?;
2740
2741 let (request, _, _) =
2742 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744 assert!(request.timeout.is_none());
2746
2747 {
2749 let server_response = assign!(http::Response::new("0".to_owned()), {
2750 lists: BTreeMap::from([(
2751 "foo".to_owned(),
2752 assign!(http::response::List::default(), {
2753 count: uint!(7),
2754 })
2755 )])
2756 });
2757
2758 let _summary = {
2759 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2760 sliding_sync
2761 .handle_response(
2762 server_response.clone(),
2763 &mut pos_guard,
2764 RequestedRequiredStates::default(),
2765 )
2766 .await?
2767 };
2768 }
2769
2770 let (request, _, _) =
2771 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2772
2773 assert!(request.timeout.is_none());
2775
2776 {
2778 let server_response = assign!(http::Response::new("1".to_owned()), {
2779 lists: BTreeMap::from([(
2780 "bar".to_owned(),
2781 assign!(http::response::List::default(), {
2782 count: uint!(7),
2783 })
2784 )])
2785 });
2786
2787 let _summary = {
2788 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2789 sliding_sync
2790 .handle_response(
2791 server_response.clone(),
2792 &mut pos_guard,
2793 RequestedRequiredStates::default(),
2794 )
2795 .await?
2796 };
2797 }
2798
2799 let (request, _, _) =
2800 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2801
2802 assert!(request.timeout.is_some());
2804
2805 Ok(())
2806 }
2807
2808 #[async_test]
2809 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2810 let server = MockServer::start().await;
2811 let client = logged_in_client(Some(server.uri())).await;
2812
2813 let _mock_guard = Mock::given(SlidingSyncMatcher)
2814 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2815 "pos": "0",
2816 "lists": {},
2817 "rooms": {}
2818 })))
2819 .mount_as_scoped(&server)
2820 .await;
2821
2822 let sliding_sync = client
2823 .sliding_sync("test")?
2824 .with_to_device_extension(
2825 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2826 )
2827 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2828 .build()
2829 .await?;
2830
2831 let sliding_sync = Arc::new(sliding_sync);
2832
2833 let sync_beat_listener = client.inner.sync_beat.listen();
2835 sliding_sync.sync_once().await?;
2836
2837 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2839 Ok(())
2840 }
2841
2842 #[async_test]
2843 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2844 let server = MockServer::start().await;
2845 let client = logged_in_client(Some(server.uri())).await;
2846
2847 let _mock_guard = Mock::given(SlidingSyncMatcher)
2848 .respond_with(ResponseTemplate::new(404))
2849 .mount_as_scoped(&server)
2850 .await;
2851
2852 let sliding_sync = client
2853 .sliding_sync("test")?
2854 .with_to_device_extension(
2855 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2856 )
2857 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2858 .build()
2859 .await?;
2860
2861 let sliding_sync = Arc::new(sliding_sync);
2862
2863 let sync_beat_listener = client.inner.sync_beat.listen();
2865 let sync_result = sliding_sync.sync_once().await;
2866 assert!(sync_result.is_err());
2867
2868 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2870
2871 Ok(())
2872 }
2873}