1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26 collections::{btree_map::Entry, BTreeMap},
27 fmt::Debug,
28 future::Future,
29 sync::{Arc, RwLock as StdRwLock},
30 time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46 select,
47 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53 cache::restore_sliding_sync_state,
54 client::SlidingSyncResponseProcessor,
55 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{config::RequestConfig, Client, Result};
58
59#[derive(Clone, Debug)]
63pub struct SlidingSync {
64 inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70 id: String,
74
75 client: Client,
77
78 poll_timeout: Duration,
80
81 network_timeout: Duration,
84
85 storage_key: String,
87
88 share_pos: bool,
95
96 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 pub async fn has_pos(&self) -> bool {
129 self.inner.position.lock().await.pos.is_some()
130 }
131
132 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133 cache::store_sliding_sync_state(self, position).await
134 }
135
136 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138 SlidingSyncBuilder::new(id, client)
139 }
140
141 pub fn subscribe_to_rooms(
148 &self,
149 room_ids: &[&RoomId],
150 settings: Option<http::request::RoomSubscription>,
151 cancel_in_flight_request: bool,
152 ) {
153 let settings = settings.unwrap_or_default();
154 let mut sticky = self.inner.sticky.write().unwrap();
155 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157 let mut skip_over_current_sync_loop_iteration = false;
158
159 for room_id in room_ids {
160 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167 if let Some(room) = self.inner.client.get_room(room_id) {
168 room.mark_members_missing();
169 }
170
171 entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173 skip_over_current_sync_loop_iteration = true;
174 }
175 }
176
177 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178 self.inner.internal_channel_send_if_possible(
179 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180 );
181 }
182 }
183
184 pub async fn on_list<Function, FunctionOutput, R>(
186 &self,
187 list_name: &str,
188 function: Function,
189 ) -> Option<R>
190 where
191 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192 FunctionOutput: Future<Output = R>,
193 {
194 let lists = self.inner.lists.read().await;
195
196 match lists.get(list_name) {
197 Some(list) => Some(function(list).await),
198 None => None,
199 }
200 }
201
202 pub async fn add_list(
208 &self,
209 list_builder: SlidingSyncListBuilder,
210 ) -> Result<Option<SlidingSyncList>> {
211 let list = list_builder.build(self.inner.internal_channel.clone());
212
213 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215 self.inner.internal_channel_send_if_possible(
216 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217 );
218
219 Ok(old_list)
220 }
221
222 pub async fn add_cached_list(
229 &self,
230 mut list_builder: SlidingSyncListBuilder,
231 ) -> Result<Option<SlidingSyncList>> {
232 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236 self.add_list(list_builder).await
237 }
238
239 #[instrument(skip_all)]
241 async fn handle_response(
242 &self,
243 mut sliding_sync_response: http::Response,
244 position: &mut SlidingSyncPositionMarkers,
245 requested_required_states: RequestedRequiredStates,
246 ) -> Result<UpdateSummary, crate::Error> {
247 let pos = Some(sliding_sync_response.pos.clone());
248
249 let must_process_rooms_response = self.must_process_rooms_response().await;
250
251 trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253 let sync_response = {
261 let _timer = timer!("response processor");
262
263 let response_processor = {
264 let _sync_lock = {
267 let _timer = timer!("acquiring the `sync_lock`");
268
269 self.inner.client.base_client().sync_lock().lock().await
270 };
271
272 let mut response_processor =
273 SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275 if self.is_thread_subscriptions_enabled() {
281 response_processor
282 .handle_thread_subscriptions(
283 position.pos.as_deref(),
284 std::mem::take(
285 &mut sliding_sync_response.extensions.thread_subscriptions,
286 ),
287 )
288 .await?;
289 }
290
291 #[cfg(feature = "e2e-encryption")]
292 if self.is_e2ee_enabled() {
293 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294 }
295
296 if must_process_rooms_response {
299 response_processor
300 .handle_room_response(&sliding_sync_response, &requested_required_states)
301 .await?;
302 }
303
304 response_processor
305 };
306
307 response_processor.process_and_take_response().await?
309 };
310
311 debug!("Sliding Sync response has been handled by the client");
312 trace!(?sync_response);
313
314 if let Some(ref txn_id) = sliding_sync_response.txn_id {
316 let txn_id = txn_id.as_str().into();
317 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
318 let mut lists = self.inner.lists.write().await;
319 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
320 }
321
322 let update_summary = {
323 let updated_rooms = {
325 let mut updated_rooms = Vec::with_capacity(
326 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
327 );
328
329 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
330
331 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
339
340 updated_rooms
341 };
342
343 let updated_lists = {
345 debug!(
346 lists = ?sliding_sync_response.lists,
347 "Update lists"
348 );
349
350 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
351 let mut lists = self.inner.lists.write().await;
352
353 for (name, list) in lists.iter_mut() {
356 if let Some(updates) = sliding_sync_response.lists.get(name) {
357 let maximum_number_of_rooms: u32 =
358 updates.count.try_into().expect("failed to convert `count` to `u32`");
359
360 if list.update(Some(maximum_number_of_rooms))? {
361 updated_lists.push(name.clone());
362 }
363 } else if list.update(None)? {
364 updated_lists.push(name.clone());
365 }
366 }
367
368 for name in sliding_sync_response.lists.keys() {
370 if !lists.contains_key(name) {
371 error!("Response for list `{name}` - unknown to us; skipping");
372 }
373 }
374
375 updated_lists
376 };
377
378 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
379 };
380
381 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
385
386 position.pos = pos;
387
388 Ok(update_summary)
389 }
390
391 #[instrument(skip_all)]
392 async fn generate_sync_request(
393 &self,
394 txn_id: &mut LazyTransactionId,
395 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
396 let mut requests_lists = BTreeMap::new();
398
399 let require_timeout = {
400 let lists = self.inner.lists.read().await;
401
402 let mut require_timeout = true;
404
405 for (name, list) in lists.iter() {
406 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
407 require_timeout = require_timeout && list.requires_timeout();
408 }
409
410 require_timeout
411 };
412
413 let mut position_guard = {
421 debug!("Waiting to acquire the `position` lock");
422
423 let _timer = timer!("acquiring the `position` lock");
424
425 self.inner.position.clone().lock_owned().await
426 };
427
428 debug!(pos = ?position_guard.pos, "Got a position");
429
430 let to_device_enabled =
431 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
432
433 let restored_fields = if self.inner.share_pos || to_device_enabled {
434 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
435 } else {
436 None
437 };
438
439 let pos = if self.inner.share_pos {
442 if let Some(fields) = &restored_fields {
443 if fields.pos != position_guard.pos {
445 info!(
446 "Pos from previous request ('{:?}') was different from \
447 pos in database ('{:?}').",
448 position_guard.pos, fields.pos
449 );
450 position_guard.pos = fields.pos.clone();
451 }
452 fields.pos.clone()
453 } else {
454 position_guard.pos.clone()
455 }
456 } else {
457 position_guard.pos.clone()
458 };
459
460 Span::current().record("pos", &pos);
461
462 #[cfg(feature = "e2e-encryption")]
471 if pos.is_none() && self.is_e2ee_enabled() {
472 info!("Marking all tracked users as dirty");
473
474 let olm_machine = self.inner.client.olm_machine().await;
475 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
476 olm_machine.mark_all_tracked_users_as_dirty().await?;
477 }
478
479 let timeout = require_timeout.then(|| self.inner.poll_timeout);
484
485 let mut request = assign!(http::Request::new(), {
486 conn_id: Some(self.inner.id.clone()),
487 pos,
488 timeout,
489 lists: requests_lists,
490 });
491
492 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
494
495 if to_device_enabled {
499 request.extensions.to_device.since =
500 restored_fields.and_then(|fields| fields.to_device_token);
501 }
502
503 if let Some(txn_id) = txn_id.get() {
505 request.txn_id = Some(txn_id.to_string());
506 }
507
508 Ok((
509 request,
511 RequestConfig::default()
514 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
515 .retry_limit(3),
516 position_guard,
517 ))
518 }
519
520 async fn send_sync_request(
524 &self,
525 request: http::Request,
526 request_config: RequestConfig,
527 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
528 ) -> Result<UpdateSummary> {
529 debug!("Sending request");
530
531 let requested_required_states = RequestedRequiredStates::from(&request);
533 let request = self.inner.client.send(request).with_request_config(request_config);
534
535 #[cfg(feature = "e2e-encryption")]
542 let response = {
543 if self.is_e2ee_enabled() {
544 let client = self.inner.client.clone();
561 let e2ee_uploads = spawn(
562 async move {
563 if let Err(error) = client.send_outgoing_requests().await {
564 error!(?error, "Error while sending outgoing E2EE requests");
565 }
566 }
567 .instrument(Span::current()),
568 )
569 .abort_on_drop();
572
573 let response = request.await?;
575
576 e2ee_uploads.await.map_err(|error| Error::JoinError {
581 task_description: "e2ee_uploads".to_owned(),
582 error,
583 })?;
584
585 response
586 } else {
587 request.await?
588 }
589 };
590
591 #[cfg(not(feature = "e2e-encryption"))]
593 let response = request.await?;
594
595 debug!("Received response");
596
597 let this = self.clone();
607
608 let future = async move {
611 debug!("Start handling response");
612
613 let updates = this
619 .handle_response(response, &mut position_guard, requested_required_states)
620 .await?;
621
622 this.cache_to_storage(&position_guard).await?;
623
624 drop(position_guard);
627
628 debug!("Done handling response");
629
630 Ok(updates)
631 };
632
633 spawn(future.instrument(Span::current())).await.unwrap()
634 }
635
636 #[cfg(feature = "e2e-encryption")]
638 fn is_e2ee_enabled(&self) -> bool {
639 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
640 }
641
642 fn is_thread_subscriptions_enabled(&self) -> bool {
645 self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
646 == Some(true)
647 }
648
649 #[cfg(not(feature = "e2e-encryption"))]
650 fn is_e2ee_enabled(&self) -> bool {
651 false
652 }
653
654 async fn must_process_rooms_response(&self) -> bool {
656 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
659 || !self.inner.lists.read().await.is_empty()
660 }
661
662 #[doc(hidden)]
666 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
667 pub async fn sync_once(&self) -> Result<UpdateSummary> {
668 let (request, request_config, position_guard) =
669 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
670
671 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
673
674 self.inner.client.inner.sync_beat.notify(usize::MAX);
676
677 Ok(summaries)
678 }
679
680 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
690 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
691 debug!("Starting sync stream");
692
693 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
694
695 stream! {
696 loop {
697 debug!("Sync stream is running");
698
699 select! {
700 biased;
701
702 internal_message = internal_channel_receiver.recv() => {
703 use SlidingSyncInternalMessage::*;
704
705 debug!(?internal_message, "Sync stream has received an internal message");
706
707 match internal_message {
708 Err(_) | Ok(SyncLoopStop) => {
709 break;
710 }
711
712 Ok(SyncLoopSkipOverCurrentIteration) => {
713 continue;
714 }
715 }
716 }
717
718 update_summary = self.sync_once() => {
719 match update_summary {
720 Ok(updates) => {
721 yield Ok(updates);
722 }
723
724 Err(error) => {
726 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
727 self.expire_session().await;
729 }
730
731 yield Err(error);
732
733 break;
735 }
736 }
737 }
738 }
739 }
740
741 debug!("Sync stream has exited.");
742 }
743 }
744
745 pub fn stop_sync(&self) -> Result<()> {
754 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
755 }
756
757 #[doc(hidden)]
769 pub async fn expire_session(&self) {
770 info!("Session expired; resetting `pos` and sticky parameters");
771
772 {
773 let lists = self.inner.lists.read().await;
774 for list in lists.values() {
775 list.set_maximum_number_of_rooms(None);
777
778 list.invalidate_sticky_data();
780 }
781 }
782
783 {
785 let mut position = self.inner.position.lock().await;
786
787 position.pos = None;
789
790 if let Err(err) = self.cache_to_storage(&position).await {
794 warn!("Failed to invalidate cached sliding sync state: {err}");
795 }
796 }
797
798 {
799 let mut sticky = self.inner.sticky.write().unwrap();
800
801 sticky.data_mut().room_subscriptions.clear();
804 }
805 }
806}
807
808impl SlidingSyncInner {
809 #[instrument]
811 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
812 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
813 }
814
815 #[instrument]
818 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
819 let _ = self.internal_channel.send(message);
821 }
822}
823
824#[derive(Copy, Clone, Debug, PartialEq)]
825enum SlidingSyncInternalMessage {
826 SyncLoopStop,
828
829 SyncLoopSkipOverCurrentIteration,
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl SlidingSync {
836 pub async fn set_pos(&self, new_pos: String) {
838 let mut position_lock = self.inner.position.lock().await;
839 position_lock.pos = Some(new_pos);
840 }
841
842 pub fn extensions_config(&self) -> http::request::Extensions {
848 let sticky = self.inner.sticky.read().unwrap();
849 sticky.data().extensions.clone()
850 }
851}
852
853#[derive(Clone, Debug)]
854pub(super) struct SlidingSyncPositionMarkers {
855 pos: Option<String>,
858}
859
860#[derive(Serialize, Deserialize)]
861struct FrozenSlidingSyncPos {
862 #[serde(skip_serializing_if = "Option::is_none")]
863 pos: Option<String>,
864}
865
866#[derive(Debug, Clone)]
869pub struct UpdateSummary {
870 pub lists: Vec<String>,
872 pub rooms: Vec<OwnedRoomId>,
874}
875
876#[derive(Debug, Default)]
880enum RoomSubscriptionState {
881 #[default]
885 Pending,
886
887 Applied,
890}
891
892#[derive(Debug)]
895pub(super) struct SlidingSyncStickyParameters {
896 room_subscriptions:
899 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
900
901 extensions: http::request::Extensions,
904}
905
906impl SlidingSyncStickyParameters {
907 pub fn new(
909 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
910 extensions: http::request::Extensions,
911 ) -> Self {
912 Self {
913 room_subscriptions: room_subscriptions
914 .into_iter()
915 .map(|(room_id, room_subscription)| {
916 (room_id, (RoomSubscriptionState::Pending, room_subscription))
917 })
918 .collect(),
919 extensions,
920 }
921 }
922}
923
924impl StickyData for SlidingSyncStickyParameters {
925 type Request = http::Request;
926
927 fn apply(&self, request: &mut Self::Request) {
928 request.room_subscriptions = self
929 .room_subscriptions
930 .iter()
931 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
932 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
933 .collect();
934 request.extensions = self.extensions.clone();
935 }
936
937 fn on_commit(&mut self) {
938 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
940 if matches!(state, RoomSubscriptionState::Pending) {
941 *state = RoomSubscriptionState::Applied;
942 }
943 }
944 }
945}
946
947#[cfg(all(test, not(target_family = "wasm")))]
948#[allow(clippy::dbg_macro)]
949mod tests {
950 use std::{
951 collections::BTreeMap,
952 future::ready,
953 ops::Not,
954 sync::{Arc, Mutex},
955 time::Duration,
956 };
957
958 use assert_matches::assert_matches;
959 use event_listener::Listener;
960 use futures_util::{future::join_all, pin_mut, StreamExt};
961 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
962 use matrix_sdk_common::executor::spawn;
963 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
964 use ruma::{
965 api::client::error::ErrorKind,
966 assign,
967 events::{direct::DirectEvent, room::member::MembershipState},
968 owned_room_id, room_id,
969 serde::Raw,
970 uint, OwnedRoomId, TransactionId,
971 };
972 use serde::Deserialize;
973 use serde_json::json;
974 use wiremock::{
975 http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
976 };
977
978 use super::{
979 http,
980 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
981 SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
982 SlidingSyncStickyParameters,
983 };
984 use crate::{
985 sliding_sync::cache::restore_sliding_sync_state,
986 test_utils::{logged_in_client, mocks::MatrixMockServer},
987 Client, Result,
988 };
989
990 #[derive(Copy, Clone)]
991 struct SlidingSyncMatcher;
992
993 impl Match for SlidingSyncMatcher {
994 fn matches(&self, request: &Request) -> bool {
995 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
996 && request.method == Method::POST
997 }
998 }
999
1000 async fn new_sliding_sync(
1001 lists: Vec<SlidingSyncListBuilder>,
1002 ) -> Result<(MockServer, SlidingSync)> {
1003 let server = MockServer::start().await;
1004 let client = logged_in_client(Some(server.uri())).await;
1005
1006 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1007
1008 for list in lists {
1009 sliding_sync_builder = sliding_sync_builder.add_list(list);
1010 }
1011
1012 let sliding_sync = sliding_sync_builder.build().await?;
1013
1014 Ok((server, sliding_sync))
1015 }
1016
1017 #[async_test]
1018 async fn test_subscribe_to_rooms() -> Result<()> {
1019 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1020 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1021 .await?;
1022
1023 let stream = sliding_sync.sync();
1024 pin_mut!(stream);
1025
1026 let room_id_0 = room_id!("!r0:bar.org");
1027 let room_id_1 = room_id!("!r1:bar.org");
1028 let room_id_2 = room_id!("!r2:bar.org");
1029
1030 {
1031 let _mock_guard = Mock::given(SlidingSyncMatcher)
1032 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1033 "pos": "1",
1034 "lists": {},
1035 "rooms": {
1036 room_id_0: {
1037 "name": "Room #0",
1038 "initial": true,
1039 },
1040 room_id_1: {
1041 "name": "Room #1",
1042 "initial": true,
1043 },
1044 room_id_2: {
1045 "name": "Room #2",
1046 "initial": true,
1047 },
1048 }
1049 })))
1050 .mount_as_scoped(&server)
1051 .await;
1052
1053 let _ = stream.next().await.unwrap()?;
1054 }
1055
1056 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1057
1058 assert!(room0.are_members_synced().not());
1062
1063 {
1064 struct MemberMatcher(OwnedRoomId);
1065
1066 impl Match for MemberMatcher {
1067 fn matches(&self, request: &Request) -> bool {
1068 request.url.path()
1069 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1070 && request.method == Method::GET
1071 }
1072 }
1073
1074 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1075 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1076 "chunk": [],
1077 })))
1078 .mount_as_scoped(&server)
1079 .await;
1080
1081 assert_matches!(room0.request_members().await, Ok(()));
1082 }
1083
1084 assert!(room0.are_members_synced());
1086
1087 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1088
1089 assert!(room0.are_members_synced().not());
1092
1093 {
1094 let sticky = sliding_sync.inner.sticky.read().unwrap();
1095 let room_subscriptions = &sticky.data().room_subscriptions;
1096
1097 assert!(room_subscriptions.contains_key(room_id_0));
1098 assert!(room_subscriptions.contains_key(room_id_1));
1099 assert!(!room_subscriptions.contains_key(room_id_2));
1100 }
1101
1102 {
1105 struct MemberMatcher(OwnedRoomId);
1106
1107 impl Match for MemberMatcher {
1108 fn matches(&self, request: &Request) -> bool {
1109 request.url.path()
1110 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1111 && request.method == Method::GET
1112 }
1113 }
1114
1115 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1116 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1117 "chunk": [],
1118 })))
1119 .mount_as_scoped(&server)
1120 .await;
1121
1122 assert_matches!(room0.request_members().await, Ok(()));
1123 }
1124
1125 assert!(room0.are_members_synced());
1127
1128 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1129
1130 assert!(room0.are_members_synced());
1133
1134 Ok(())
1135 }
1136
1137 #[async_test]
1138 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1139 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1140 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1141 .await?;
1142
1143 let room_id_0 = room_id!("!r0:bar.org");
1144 let room_id_1 = room_id!("!r1:bar.org");
1145 let room_id_2 = room_id!("!r2:bar.org");
1146
1147 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1149
1150 {
1151 let sticky = sliding_sync.inner.sticky.read().unwrap();
1152 let room_subscriptions = &sticky.data().room_subscriptions;
1153
1154 assert!(room_subscriptions.contains_key(room_id_0));
1155 assert!(room_subscriptions.contains_key(room_id_1));
1156 assert!(room_subscriptions.contains_key(room_id_2).not());
1157 }
1158
1159 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1161
1162 {
1163 let sticky = sliding_sync.inner.sticky.read().unwrap();
1164 let room_subscriptions = &sticky.data().room_subscriptions;
1165
1166 assert!(room_subscriptions.contains_key(room_id_0));
1167 assert!(room_subscriptions.contains_key(room_id_1));
1168 assert!(room_subscriptions.contains_key(room_id_2));
1169 }
1170
1171 sliding_sync.expire_session().await;
1173
1174 {
1175 let sticky = sliding_sync.inner.sticky.read().unwrap();
1176 let room_subscriptions = &sticky.data().room_subscriptions;
1177
1178 assert!(room_subscriptions.is_empty());
1179 }
1180
1181 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1183
1184 {
1185 let sticky = sliding_sync.inner.sticky.read().unwrap();
1186 let room_subscriptions = &sticky.data().room_subscriptions;
1187
1188 assert!(room_subscriptions.contains_key(room_id_0).not());
1189 assert!(room_subscriptions.contains_key(room_id_1).not());
1190 assert!(room_subscriptions.contains_key(room_id_2));
1191 }
1192
1193 Ok(())
1194 }
1195
1196 #[async_test]
1197 async fn test_add_list() -> Result<()> {
1198 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1199 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1200 .await?;
1201
1202 let _stream = sliding_sync.sync();
1203 pin_mut!(_stream);
1204
1205 sliding_sync
1206 .add_list(
1207 SlidingSyncList::builder("bar")
1208 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1209 )
1210 .await?;
1211
1212 let lists = sliding_sync.inner.lists.read().await;
1213
1214 assert!(lists.contains_key("foo"));
1215 assert!(lists.contains_key("bar"));
1216
1217 Ok(())
1220 }
1221
1222 #[test]
1223 fn test_sticky_parameters_api_invalidated_flow() {
1224 let r0 = room_id!("!r0.matrix.org");
1225 let r1 = room_id!("!r1:matrix.org");
1226
1227 let mut room_subscriptions = BTreeMap::new();
1228 room_subscriptions.insert(r0.to_owned(), Default::default());
1229
1230 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1232 room_subscriptions,
1233 Default::default(),
1234 ));
1235 assert!(sticky.is_invalidated());
1236
1237 let txn_id: &TransactionId = "tid123".into();
1239
1240 let mut request = http::Request::default();
1241 request.txn_id = Some(txn_id.to_string());
1242
1243 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1244
1245 assert!(request.txn_id.is_some());
1246 assert_eq!(request.room_subscriptions.len(), 1);
1247 assert!(request.room_subscriptions.contains_key(r0));
1248
1249 let tid = request.txn_id.unwrap();
1250
1251 sticky.maybe_commit(tid.as_str().into());
1252 assert!(!sticky.is_invalidated());
1253
1254 sticky
1256 .data_mut()
1257 .room_subscriptions
1258 .insert(r1.to_owned(), (Default::default(), Default::default()));
1259 assert!(sticky.is_invalidated());
1260
1261 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1263 assert!(sticky.is_invalidated());
1264
1265 let txn_id1: &TransactionId = "tid456".into();
1267 let mut request1 = http::Request::default();
1268 request1.txn_id = Some(txn_id1.to_string());
1269 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1270
1271 assert!(sticky.is_invalidated());
1272 assert_eq!(request1.room_subscriptions.len(), 1);
1276 assert!(request1.room_subscriptions.contains_key(r1));
1277
1278 let txn_id2: &TransactionId = "tid789".into();
1279 let mut request2 = http::Request::default();
1280 request2.txn_id = Some(txn_id2.to_string());
1281
1282 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1283 assert!(sticky.is_invalidated());
1284 assert_eq!(request2.room_subscriptions.len(), 1);
1287 assert!(request2.room_subscriptions.contains_key(r1));
1288
1289 sticky.maybe_commit(txn_id1);
1292 assert!(sticky.is_invalidated());
1293
1294 sticky.maybe_commit(txn_id2);
1296 assert!(!sticky.is_invalidated());
1297 }
1298
1299 #[test]
1300 fn test_room_subscriptions_are_sticky() {
1301 let r0 = room_id!("!r0.matrix.org");
1302 let r1 = room_id!("!r1:matrix.org");
1303
1304 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1305 BTreeMap::new(),
1306 Default::default(),
1307 ));
1308
1309 {
1311 sticky
1313 .data_mut()
1314 .room_subscriptions
1315 .insert(r0.to_owned(), (Default::default(), Default::default()));
1316
1317 let txn_id: &TransactionId = "tid0".into();
1319 let mut request = http::Request::default();
1320 request.txn_id = Some(txn_id.to_string());
1321
1322 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1323
1324 assert!(request.txn_id.is_some());
1325 assert_eq!(request.room_subscriptions.len(), 1);
1326 assert!(request.room_subscriptions.contains_key(r0));
1327
1328 let tid = request.txn_id.unwrap();
1330
1331 sticky.maybe_commit(tid.as_str().into());
1332 }
1333
1334 {
1336 sticky
1338 .data_mut()
1339 .room_subscriptions
1340 .insert(r1.to_owned(), (Default::default(), Default::default()));
1341
1342 let txn_id: &TransactionId = "tid1".into();
1344 let mut request = http::Request::default();
1345 request.txn_id = Some(txn_id.to_string());
1346
1347 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1348
1349 assert!(request.txn_id.is_some());
1350 assert_eq!(request.room_subscriptions.len(), 1);
1351 assert!(request.room_subscriptions.contains_key(r1));
1353
1354 }
1358
1359 {
1361 let txn_id: &TransactionId = "tid2".into();
1363 let mut request = http::Request::default();
1364 request.txn_id = Some(txn_id.to_string());
1365
1366 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1367
1368 assert!(request.txn_id.is_some());
1369 assert_eq!(request.room_subscriptions.len(), 1);
1370 assert!(request.room_subscriptions.contains_key(r1));
1372
1373 let tid = request.txn_id.unwrap();
1375
1376 sticky.maybe_commit(tid.as_str().into());
1377 }
1378
1379 {
1381 let txn_id: &TransactionId = "tid3".into();
1383 let mut request = http::Request::default();
1384 request.txn_id = Some(txn_id.to_string());
1385
1386 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1387
1388 assert!(request.txn_id.is_some());
1389 assert!(request.room_subscriptions.is_empty());
1391 }
1392 }
1393
1394 #[test]
1395 fn test_extensions_are_sticky() {
1396 let mut extensions = http::request::Extensions::default();
1397 extensions.account_data.enabled = Some(true);
1398
1399 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1401 Default::default(),
1402 extensions,
1403 ));
1404
1405 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1406
1407 let extensions = &sticky.data().extensions;
1410 assert_eq!(extensions.e2ee.enabled, None);
1411 assert_eq!(extensions.to_device.enabled, None);
1412 assert_eq!(extensions.to_device.since, None);
1413
1414 assert_eq!(extensions.account_data.enabled, Some(true));
1416
1417 let txn_id: &TransactionId = "tid123".into();
1418 let mut request = http::Request::default();
1419 request.txn_id = Some(txn_id.to_string());
1420 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1421 assert!(sticky.is_invalidated());
1422 assert_eq!(request.extensions.to_device.enabled, None);
1423 assert_eq!(request.extensions.to_device.since, None);
1424 assert_eq!(request.extensions.e2ee.enabled, None);
1425 assert_eq!(request.extensions.account_data.enabled, Some(true));
1426 }
1427
1428 #[async_test]
1429 async fn test_sticky_extensions_plus_since() -> Result<()> {
1430 let server = MockServer::start().await;
1431 let client = logged_in_client(Some(server.uri())).await;
1432
1433 let sync = client
1434 .sliding_sync("test-slidingsync")?
1435 .add_list(SlidingSyncList::builder("new_list"))
1436 .build()
1437 .await?;
1438
1439 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1441 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1442 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1443
1444 let sync = client
1446 .sliding_sync("test-slidingsync")?
1447 .add_list(SlidingSyncList::builder("new_list"))
1448 .with_to_device_extension(
1449 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1450 )
1451 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1452 .build()
1453 .await?;
1454
1455 let txn_id = TransactionId::new();
1458 let (request, _, _) = sync
1459 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1460 .await?;
1461
1462 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1463 assert_eq!(request.extensions.to_device.enabled, Some(true));
1464 assert!(request.extensions.to_device.since.is_none());
1465
1466 {
1467 let mut sticky = sync.inner.sticky.write().unwrap();
1469 assert!(sticky.is_invalidated());
1470 sticky.maybe_commit(
1471 "hopefully the rng won't generate this very specific transaction id".into(),
1472 );
1473 assert!(sticky.is_invalidated());
1474 }
1475
1476 let txn_id2 = TransactionId::new();
1478 let (request, _, _) = sync
1479 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1480 .await?;
1481
1482 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1483 assert_eq!(request.extensions.to_device.enabled, Some(true));
1484 assert!(request.extensions.to_device.since.is_none());
1485
1486 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1487
1488 {
1489 let mut sticky = sync.inner.sticky.write().unwrap();
1491 assert!(sticky.is_invalidated());
1492 sticky.maybe_commit(txn_id2.as_str().into());
1493 assert!(!sticky.is_invalidated());
1494 }
1495
1496 let txn_id = TransactionId::new();
1498 let (request, _, _) = sync
1499 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500 .await?;
1501 assert!(request.extensions.e2ee.enabled.is_none());
1502 assert!(request.extensions.to_device.enabled.is_none());
1503 assert!(request.extensions.to_device.since.is_none());
1504
1505 let _since_token = "since";
1509
1510 #[cfg(feature = "e2e-encryption")]
1511 {
1512 use matrix_sdk_base::crypto::store::types::Changes;
1513 if let Some(olm_machine) = &*client.olm_machine().await {
1514 olm_machine
1515 .store()
1516 .save_changes(Changes {
1517 next_batch_token: Some(_since_token.to_owned()),
1518 ..Default::default()
1519 })
1520 .await?;
1521 }
1522 }
1523
1524 let txn_id = TransactionId::new();
1525 let (request, _, _) = sync
1526 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1527 .await?;
1528
1529 assert!(request.extensions.e2ee.enabled.is_none());
1530 assert!(request.extensions.to_device.enabled.is_none());
1531
1532 #[cfg(feature = "e2e-encryption")]
1533 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1534
1535 Ok(())
1536 }
1537
1538 #[async_test]
1544 #[cfg(feature = "e2e-encryption")]
1545 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1546 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1547 use matrix_sdk_test::ruma_response_from_json;
1548 use ruma::user_id;
1549
1550 let server = MockServer::start().await;
1551 let client = logged_in_client(Some(server.uri())).await;
1552
1553 let alice = user_id!("@alice:localhost");
1554 let bob = user_id!("@bob:localhost");
1555 let me = user_id!("@example:localhost");
1556
1557 {
1560 let olm_machine = client.olm_machine().await;
1561 let olm_machine = olm_machine.as_ref().unwrap();
1562
1563 olm_machine.update_tracked_users([alice, bob]).await?;
1564
1565 let outgoing_requests = olm_machine.outgoing_requests().await?;
1567
1568 assert_eq!(outgoing_requests.len(), 2);
1569 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1570 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1571
1572 olm_machine
1574 .mark_request_as_sent(
1575 outgoing_requests[0].request_id(),
1576 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1577 "one_time_key_counts": {}
1578 }))),
1579 )
1580 .await?;
1581
1582 olm_machine
1583 .mark_request_as_sent(
1584 outgoing_requests[1].request_id(),
1585 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1586 "device_keys": {
1587 alice: {},
1588 bob: {},
1589 }
1590 }))),
1591 )
1592 .await?;
1593
1594 let outgoing_requests = olm_machine.outgoing_requests().await?;
1596
1597 assert_eq!(outgoing_requests.len(), 1);
1598 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1599
1600 olm_machine
1601 .mark_request_as_sent(
1602 outgoing_requests[0].request_id(),
1603 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1604 "device_keys": {
1605 me: {},
1606 }
1607 }))),
1608 )
1609 .await?;
1610
1611 let outgoing_requests = olm_machine.outgoing_requests().await?;
1613
1614 assert!(outgoing_requests.is_empty());
1615 }
1616
1617 let sync = client
1618 .sliding_sync("test-slidingsync")?
1619 .add_list(SlidingSyncList::builder("new_list"))
1620 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1621 .build()
1622 .await?;
1623
1624 let txn_id = TransactionId::new();
1626 let (_request, _, _) = sync
1627 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1628 .await?;
1629
1630 {
1632 let olm_machine = client.olm_machine().await;
1633 let olm_machine = olm_machine.as_ref().unwrap();
1634
1635 let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638 assert_eq!(outgoing_requests.len(), 1);
1639 assert_matches!(
1640 outgoing_requests[0].request(),
1641 AnyOutgoingRequest::KeysQuery(request) => {
1642 assert!(request.device_keys.contains_key(alice));
1643 assert!(request.device_keys.contains_key(bob));
1644 assert!(request.device_keys.contains_key(me));
1645 }
1646 );
1647
1648 olm_machine
1650 .mark_request_as_sent(
1651 outgoing_requests[0].request_id(),
1652 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1653 "device_keys": {
1654 alice: {},
1655 bob: {},
1656 me: {},
1657 }
1658 }))),
1659 )
1660 .await?;
1661 }
1662
1663 sync.set_pos("chocolat".to_owned()).await;
1665
1666 let txn_id = TransactionId::new();
1667 let (_request, _, _) = sync
1668 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1669 .await?;
1670
1671 {
1673 let olm_machine = client.olm_machine().await;
1674 let olm_machine = olm_machine.as_ref().unwrap();
1675
1676 let outgoing_requests = olm_machine.outgoing_requests().await?;
1678
1679 assert!(outgoing_requests.is_empty());
1680 }
1681
1682 Ok(())
1683 }
1684
1685 #[async_test]
1686 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1687 let server = MockServer::start().await;
1688 let client = logged_in_client(Some(server.uri())).await;
1689
1690 let sliding_sync = client
1691 .sliding_sync("test-slidingsync")?
1692 .with_to_device_extension(
1693 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1694 )
1695 .build()
1696 .await?;
1697
1698 let (request, _, _) =
1700 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1701 assert!(request.extensions.to_device.enabled.is_some());
1702
1703 let sync = sliding_sync.sync();
1704 pin_mut!(sync);
1705
1706 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1708
1709 #[derive(Deserialize)]
1710 struct PartialRequest {
1711 txn_id: Option<String>,
1712 }
1713
1714 {
1715 let _mock_guard = Mock::given(SlidingSyncMatcher)
1716 .respond_with(|request: &Request| {
1717 let request: PartialRequest = request.body_json().unwrap();
1719
1720 ResponseTemplate::new(200).set_body_json(json!({
1721 "txn_id": request.txn_id,
1722 "pos": "0",
1723 }))
1724 })
1725 .mount_as_scoped(&server)
1726 .await;
1727
1728 let next = sync.next().await;
1729 assert_matches!(next, Some(Ok(_update_summary)));
1730
1731 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1733 }
1734
1735 let (request, _, _) =
1737 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1738 assert!(request.extensions.to_device.enabled.is_none());
1739
1740 {
1742 let _mock_guard = Mock::given(SlidingSyncMatcher)
1743 .respond_with(|request: &Request| {
1744 let request: PartialRequest = request.body_json().unwrap();
1746
1747 ResponseTemplate::new(200).set_body_json(json!({
1748 "txn_id": request.txn_id,
1749 "pos": "1",
1750 }))
1751 })
1752 .mount_as_scoped(&server)
1753 .await;
1754
1755 let next = sync.next().await;
1756 assert_matches!(next, Some(Ok(_update_summary)));
1757
1758 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1760 }
1761
1762 {
1765 let _mock_guard = Mock::given(SlidingSyncMatcher)
1766 .respond_with(|request: &Request| {
1767 let request: PartialRequest = request.body_json().unwrap();
1769
1770 ResponseTemplate::new(200).set_body_json(json!({
1771 "txn_id": request.txn_id,
1772 "pos": "0", }))
1774 })
1775 .up_to_n_times(1) .mount_as_scoped(&server)
1777 .await;
1778
1779 let next = sync.next().await;
1780 assert_matches!(next, Some(Ok(_update_summary)));
1781
1782 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1784 }
1785
1786 {
1791 let _mock_guard = Mock::given(SlidingSyncMatcher)
1792 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1793 "error": "foo",
1794 "errcode": "M_UNKNOWN_POS",
1795 })))
1796 .mount_as_scoped(&server)
1797 .await;
1798
1799 let next = sync.next().await;
1800
1801 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1803
1804 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1806
1807 let (request, _, _) =
1809 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1810
1811 assert!(request.extensions.to_device.enabled.is_some());
1812
1813 assert!(sync.next().await.is_none());
1815 }
1816
1817 Ok(())
1818 }
1819
1820 #[cfg(feature = "e2e-encryption")]
1821 #[async_test]
1822 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1823 let server = MockServer::start().await;
1824
1825 #[derive(Deserialize)]
1826 struct PartialRequest {
1827 txn_id: Option<String>,
1828 }
1829
1830 let server_pos = Arc::new(Mutex::new(0));
1831 let _mock_guard = Mock::given(SlidingSyncMatcher)
1832 .respond_with(move |request: &Request| {
1833 let request: PartialRequest = request.body_json().unwrap();
1835 let pos = {
1836 let mut pos = server_pos.lock().unwrap();
1837 let prev = *pos;
1838 *pos += 1;
1839 prev
1840 };
1841
1842 ResponseTemplate::new(200).set_body_json(json!({
1843 "txn_id": request.txn_id,
1844 "pos": pos.to_string(),
1845 }))
1846 })
1847 .mount_as_scoped(&server)
1848 .await;
1849
1850 let client = logged_in_client(Some(server.uri())).await;
1851
1852 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1853
1854 {
1856 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1857
1858 let (request, _, _) =
1859 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1860 assert!(request.pos.is_none());
1861 }
1862
1863 let sync = sliding_sync.sync();
1864 pin_mut!(sync);
1865
1866 let next = sync.next().await;
1869 assert_matches!(next, Some(Ok(_update_summary)));
1870
1871 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1872
1873 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1874 .await?
1875 .expect("must have restored fields");
1876
1877 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1880
1881 {
1885 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1886
1887 let mut position_guard = other_sync.inner.position.lock().await;
1888 position_guard.pos = Some("yolo".to_owned());
1889
1890 other_sync.cache_to_storage(&position_guard).await?;
1891 }
1892
1893 {
1895 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896 let (request, _, _) =
1897 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1898 assert_eq!(request.pos.as_deref(), Some("0"));
1899 }
1900
1901 {
1904 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1905 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1906 }
1907
1908 Ok(())
1909 }
1910
1911 #[cfg(feature = "e2e-encryption")]
1912 #[async_test]
1913 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1914 let server = MockServer::start().await;
1915
1916 #[derive(Deserialize)]
1917 struct PartialRequest {
1918 txn_id: Option<String>,
1919 }
1920
1921 let server_pos = Arc::new(Mutex::new(0));
1922 let _mock_guard = Mock::given(SlidingSyncMatcher)
1923 .respond_with(move |request: &Request| {
1924 let request: PartialRequest = request.body_json().unwrap();
1926 let pos = {
1927 let mut pos = server_pos.lock().unwrap();
1928 let prev = *pos;
1929 *pos += 1;
1930 prev
1931 };
1932
1933 ResponseTemplate::new(200).set_body_json(json!({
1934 "txn_id": request.txn_id,
1935 "pos": pos.to_string(),
1936 }))
1937 })
1938 .mount_as_scoped(&server)
1939 .await;
1940
1941 let client = logged_in_client(Some(server.uri())).await;
1942
1943 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1944
1945 {
1947 let (request, _, _) =
1948 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1949
1950 assert!(request.pos.is_none());
1951 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1952 }
1953
1954 let sync = sliding_sync.sync();
1955 pin_mut!(sync);
1956
1957 let next = sync.next().await;
1960 assert_matches!(next, Some(Ok(_update_summary)));
1961
1962 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1963
1964 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1965 .await?
1966 .expect("must have restored fields");
1967
1968 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1971
1972 {
1974 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1975
1976 let mut position_guard = other_sync.inner.position.lock().await;
1977 position_guard.pos = Some("42".to_owned());
1978
1979 other_sync.cache_to_storage(&position_guard).await?;
1980 }
1981
1982 {
1984 let (request, _, _) =
1985 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1986 assert_eq!(request.pos.as_deref(), Some("42"));
1987 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988 }
1989
1990 {
1992 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1993 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1994
1995 let (request, _, _) =
1996 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1997 assert_eq!(request.pos.as_deref(), Some("42"));
1998 }
1999
2000 sliding_sync.expire_session().await;
2003
2004 {
2005 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2006
2007 let (request, _, _) =
2008 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2009 assert!(request.pos.is_none());
2010 }
2011
2012 {
2014 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2015 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2016
2017 let (request, _, _) =
2018 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2019 assert!(request.pos.is_none());
2020 }
2021
2022 Ok(())
2023 }
2024
2025 #[async_test]
2026 async fn test_stop_sync_loop() -> Result<()> {
2027 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2028 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2029 .await?;
2030
2031 let stream = sliding_sync.sync();
2033 pin_mut!(stream);
2034
2035 assert!(stream.next().await.is_some());
2037
2038 sliding_sync.stop_sync()?;
2040
2041 assert!(stream.next().await.is_none());
2043
2044 let stream = sliding_sync.sync();
2046 pin_mut!(stream);
2047
2048 assert!(stream.next().await.is_some());
2050
2051 Ok(())
2052 }
2053
2054 #[async_test]
2055 async fn test_process_read_receipts() -> Result<()> {
2056 let room = owned_room_id!("!pony:example.org");
2057
2058 let server = MockServer::start().await;
2059 let client = logged_in_client(Some(server.uri())).await;
2060 client.event_cache().subscribe().unwrap();
2061
2062 let sliding_sync = client
2063 .sliding_sync("test")?
2064 .with_receipt_extension(
2065 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2066 )
2067 .add_list(
2068 SlidingSyncList::builder("all")
2069 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2070 )
2071 .build()
2072 .await?;
2073
2074 {
2076 let server_response = assign!(http::Response::new("0".to_owned()), {
2077 rooms: BTreeMap::from([(
2078 room.clone(),
2079 http::response::Room::default(),
2080 )])
2081 });
2082
2083 let _summary = {
2084 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2085 sliding_sync
2086 .handle_response(
2087 server_response.clone(),
2088 &mut pos_guard,
2089 RequestedRequiredStates::default(),
2090 )
2091 .await?
2092 };
2093 }
2094
2095 let server_response = assign!(http::Response::new("1".to_owned()), {
2096 extensions: assign!(http::response::Extensions::default(), {
2097 receipts: assign!(http::response::Receipts::default(), {
2098 rooms: BTreeMap::from([
2099 (
2100 room.clone(),
2101 Raw::from_json_string(
2102 json!({
2103 "room_id": room,
2104 "type": "m.receipt",
2105 "content": {
2106 "$event:bar.org": {
2107 "m.read": {
2108 client.user_id().unwrap(): {
2109 "ts": 1436451550,
2110 }
2111 }
2112 }
2113 }
2114 })
2115 .to_string(),
2116 ).unwrap()
2117 )
2118 ])
2119 })
2120 })
2121 });
2122
2123 let summary = {
2124 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2125 sliding_sync
2126 .handle_response(
2127 server_response.clone(),
2128 &mut pos_guard,
2129 RequestedRequiredStates::default(),
2130 )
2131 .await?
2132 };
2133
2134 assert!(summary.rooms.contains(&room));
2135
2136 Ok(())
2137 }
2138
2139 #[async_test]
2140 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2141 let room_id = owned_room_id!("!unicorn:example.org");
2142
2143 let server = MockServer::start().await;
2144 let client = logged_in_client(Some(server.uri())).await;
2145
2146 let sliding_sync = client
2149 .sliding_sync("test")?
2150 .with_account_data_extension(
2151 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2152 )
2153 .add_list(
2154 SlidingSyncList::builder("all")
2155 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2156 )
2157 .build()
2158 .await?;
2159
2160 {
2162 let server_response = assign!(http::Response::new("0".to_owned()), {
2163 rooms: BTreeMap::from([(
2164 room_id.clone(),
2165 http::response::Room::default(),
2166 )])
2167 });
2168
2169 let _summary = {
2170 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2171 sliding_sync
2172 .handle_response(
2173 server_response.clone(),
2174 &mut pos_guard,
2175 RequestedRequiredStates::default(),
2176 )
2177 .await?
2178 };
2179 }
2180
2181 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2185
2186 let update_summary = {
2187 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2188 sliding_sync
2189 .handle_response(
2190 server_response.clone(),
2191 &mut pos_guard,
2192 RequestedRequiredStates::default(),
2193 )
2194 .await?
2195 };
2196
2197 assert!(update_summary.rooms.contains(&room_id));
2200
2201 let room = client.get_room(&room_id).unwrap();
2202
2203 assert!(room.is_marked_unread());
2206
2207 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2210
2211 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2212 sliding_sync
2213 .handle_response(
2214 server_response.clone(),
2215 &mut pos_guard,
2216 RequestedRequiredStates::default(),
2217 )
2218 .await?;
2219
2220 let room = client.get_room(&room_id).unwrap();
2221
2222 assert!(!room.is_marked_unread());
2223
2224 Ok(())
2225 }
2226
2227 fn make_mark_unread_response(
2228 response_number: &str,
2229 room_id: OwnedRoomId,
2230 unread: bool,
2231 add_rooms_section: bool,
2232 ) -> http::Response {
2233 let rooms = if add_rooms_section {
2234 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2235 } else {
2236 BTreeMap::new()
2237 };
2238
2239 let extensions = assign!(http::response::Extensions::default(), {
2240 account_data: assign!(http::response::AccountData::default(), {
2241 rooms: BTreeMap::from([
2242 (
2243 room_id,
2244 vec![
2245 Raw::from_json_string(
2246 json!({
2247 "content": {
2248 "unread": unread
2249 },
2250 "type": "m.marked_unread"
2251 })
2252 .to_string(),
2253 ).unwrap()
2254 ]
2255 )
2256 ])
2257 })
2258 });
2259
2260 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2261 }
2262
2263 #[async_test]
2264 async fn test_process_rooms_account_data() -> Result<()> {
2265 let room = owned_room_id!("!pony:example.org");
2266
2267 let server = MockServer::start().await;
2268 let client = logged_in_client(Some(server.uri())).await;
2269
2270 let sliding_sync = client
2271 .sliding_sync("test")?
2272 .with_account_data_extension(
2273 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2274 )
2275 .add_list(
2276 SlidingSyncList::builder("all")
2277 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2278 )
2279 .build()
2280 .await?;
2281
2282 {
2284 let server_response = assign!(http::Response::new("0".to_owned()), {
2285 rooms: BTreeMap::from([(
2286 room.clone(),
2287 http::response::Room::default(),
2288 )])
2289 });
2290
2291 let _summary = {
2292 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2293 sliding_sync
2294 .handle_response(
2295 server_response.clone(),
2296 &mut pos_guard,
2297 RequestedRequiredStates::default(),
2298 )
2299 .await?
2300 };
2301 }
2302
2303 let server_response = assign!(http::Response::new("1".to_owned()), {
2304 extensions: assign!(http::response::Extensions::default(), {
2305 account_data: assign!(http::response::AccountData::default(), {
2306 rooms: BTreeMap::from([
2307 (
2308 room.clone(),
2309 vec![
2310 Raw::from_json_string(
2311 json!({
2312 "content": {
2313 "tags": {
2314 "u.work": {
2315 "order": 0.9
2316 }
2317 }
2318 },
2319 "type": "m.tag"
2320 })
2321 .to_string(),
2322 ).unwrap()
2323 ]
2324 )
2325 ])
2326 })
2327 })
2328 });
2329 let summary = {
2330 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2331 sliding_sync
2332 .handle_response(
2333 server_response.clone(),
2334 &mut pos_guard,
2335 RequestedRequiredStates::default(),
2336 )
2337 .await?
2338 };
2339
2340 assert!(summary.rooms.contains(&room));
2341
2342 Ok(())
2343 }
2344
2345 #[async_test]
2346 #[cfg(feature = "e2e-encryption")]
2347 async fn test_process_only_encryption_events() -> Result<()> {
2348 use ruma::OneTimeKeyAlgorithm;
2349
2350 let room = owned_room_id!("!croissant:example.org");
2351
2352 let server = MockServer::start().await;
2353 let client = logged_in_client(Some(server.uri())).await;
2354
2355 let server_response = assign!(http::Response::new("0".to_owned()), {
2356 rooms: BTreeMap::from([(
2357 room.clone(),
2358 assign!(http::response::Room::default(), {
2359 name: Some("Croissants lovers".to_owned()),
2360 timeline: Vec::new(),
2361 }),
2362 )]),
2363
2364 extensions: assign!(http::response::Extensions::default(), {
2365 e2ee: assign!(http::response::E2EE::default(), {
2366 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2367 }),
2368 to_device: Some(assign!(http::response::ToDevice::default(), {
2369 next_batch: "to-device-token".to_owned(),
2370 })),
2371 })
2372 });
2373
2374 let sliding_sync = client
2378 .sliding_sync("test")?
2379 .with_to_device_extension(
2380 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2381 )
2382 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2383 .build()
2384 .await?;
2385
2386 {
2387 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2388
2389 sliding_sync
2390 .handle_response(
2391 server_response.clone(),
2392 &mut position_guard,
2393 RequestedRequiredStates::default(),
2394 )
2395 .await?;
2396 }
2397
2398 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2400 assert_eq!(uploaded_key_count, 42);
2401
2402 {
2403 let olm_machine = &*client.olm_machine_for_testing().await;
2404 assert_eq!(
2405 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2406 Some("to-device-token")
2407 );
2408 }
2409
2410 assert!(client.get_room(&room).is_none());
2412
2413 let client = logged_in_client(Some(server.uri())).await;
2416
2417 let sliding_sync = client
2418 .sliding_sync("test")?
2419 .add_list(SlidingSyncList::builder("thelist"))
2420 .build()
2421 .await?;
2422
2423 {
2424 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2425
2426 sliding_sync
2427 .handle_response(
2428 server_response.clone(),
2429 &mut position_guard,
2430 RequestedRequiredStates::default(),
2431 )
2432 .await?;
2433 }
2434
2435 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2437 assert_eq!(uploaded_key_count, 0);
2438
2439 {
2440 let olm_machine = &*client.olm_machine_for_testing().await;
2441 assert_eq!(
2442 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2443 None
2444 );
2445 }
2446
2447 assert!(client.get_room(&room).is_some());
2449
2450 let client = logged_in_client(Some(server.uri())).await;
2452
2453 let sliding_sync = client
2454 .sliding_sync("test")?
2455 .add_list(SlidingSyncList::builder("thelist"))
2456 .with_to_device_extension(
2457 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2458 )
2459 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2460 .build()
2461 .await?;
2462
2463 {
2464 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2465
2466 sliding_sync
2467 .handle_response(
2468 server_response.clone(),
2469 &mut position_guard,
2470 RequestedRequiredStates::default(),
2471 )
2472 .await?;
2473 }
2474
2475 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2477 assert_eq!(uploaded_key_count, 42);
2478
2479 {
2480 let olm_machine = &*client.olm_machine_for_testing().await;
2481 assert_eq!(
2482 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2483 Some("to-device-token")
2484 );
2485 }
2486
2487 assert!(client.get_room(&room).is_some());
2489
2490 Ok(())
2491 }
2492
2493 #[async_test]
2494 async fn test_lock_multiple_requests() -> Result<()> {
2495 let server = MockServer::start().await;
2496 let client = logged_in_client(Some(server.uri())).await;
2497
2498 let pos = Arc::new(Mutex::new(0));
2499 let _mock_guard = Mock::given(SlidingSyncMatcher)
2500 .respond_with(move |_: &Request| {
2501 let mut pos = pos.lock().unwrap();
2502 *pos += 1;
2503 ResponseTemplate::new(200).set_body_json(json!({
2504 "pos": pos.to_string(),
2505 "lists": {},
2506 "rooms": {}
2507 }))
2508 })
2509 .mount_as_scoped(&server)
2510 .await;
2511
2512 let sliding_sync = client
2513 .sliding_sync("test")?
2514 .with_to_device_extension(
2515 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2516 )
2517 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2518 .build()
2519 .await?;
2520
2521 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2524
2525 for result in requests.await {
2526 result?;
2527 }
2528
2529 Ok(())
2530 }
2531
2532 #[async_test]
2533 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2534 let server = MockServer::start().await;
2535 let client = logged_in_client(Some(server.uri())).await;
2536
2537 let pos = Arc::new(Mutex::new(0));
2538 let _mock_guard = Mock::given(SlidingSyncMatcher)
2539 .respond_with(move |_: &Request| {
2540 let mut pos = pos.lock().unwrap();
2541 *pos += 1;
2542 ResponseTemplate::new(200)
2544 .set_body_json(json!({
2545 "pos": pos.to_string(),
2546 "lists": {},
2547 "rooms": {}
2548 }))
2549 .set_delay(Duration::from_secs(2))
2550 })
2551 .mount_as_scoped(&server)
2552 .await;
2553
2554 let sliding_sync =
2555 client
2556 .sliding_sync("test")?
2557 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2558 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2559 ))
2560 .add_list(
2561 SlidingSyncList::builder("another-list")
2562 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2563 )
2564 .build()
2565 .await?;
2566
2567 let stream = sliding_sync.sync();
2568 pin_mut!(stream);
2569
2570 let cloned_sync = sliding_sync.clone();
2571 spawn(async move {
2572 tokio::time::sleep(Duration::from_millis(100)).await;
2573
2574 cloned_sync
2575 .on_list("another-list", |list| {
2576 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2577 ready(())
2578 })
2579 .await;
2580 });
2581
2582 assert_matches!(stream.next().await, Some(Ok(_)));
2583
2584 sliding_sync.stop_sync().unwrap();
2585
2586 assert_matches!(stream.next().await, None);
2587
2588 let mut num_requests = 0;
2589
2590 for request in server.received_requests().await.unwrap() {
2591 if !SlidingSyncMatcher.matches(&request) {
2592 continue;
2593 }
2594
2595 let another_list_ranges = if num_requests == 0 {
2596 json!([[0, 10]])
2598 } else {
2599 json!([[10, 20]])
2601 };
2602
2603 num_requests += 1;
2604 assert!(num_requests <= 2, "more than one request hit the server");
2605
2606 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2607
2608 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2609 &json_value,
2610 &json!({
2611 "conn_id": "test",
2612 "lists": {
2613 "room-list": {
2614 "ranges": [[0, 9]],
2615 "required_state": [
2616 ["m.room.encryption", ""],
2617 ["m.room.tombstone", ""]
2618 ],
2619 },
2620 "another-list": {
2621 "ranges": another_list_ranges,
2622 "required_state": [
2623 ["m.room.encryption", ""],
2624 ["m.room.tombstone", ""]
2625 ],
2626 },
2627 }
2628 }),
2629 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2630 ) {
2631 dbg!(json_value);
2632 panic!("json differ: {err}");
2633 }
2634 }
2635
2636 assert_eq!(num_requests, 2);
2637
2638 Ok(())
2639 }
2640
2641 #[async_test]
2642 async fn test_timeout_zero_list() -> Result<()> {
2643 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2644
2645 let (request, _, _) =
2646 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2647
2648 assert!(request.timeout.is_some());
2651
2652 Ok(())
2653 }
2654
2655 #[async_test]
2656 async fn test_timeout_one_list() -> Result<()> {
2657 let (_server, sliding_sync) = new_sliding_sync(vec![
2658 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2659 ])
2660 .await?;
2661
2662 let (request, _, _) =
2663 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2664
2665 assert!(request.timeout.is_none());
2667
2668 {
2670 let server_response = assign!(http::Response::new("0".to_owned()), {
2671 lists: BTreeMap::from([(
2672 "foo".to_owned(),
2673 assign!(http::response::List::default(), {
2674 count: uint!(7),
2675 })
2676 )])
2677 });
2678
2679 let _summary = {
2680 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2681 sliding_sync
2682 .handle_response(
2683 server_response.clone(),
2684 &mut pos_guard,
2685 RequestedRequiredStates::default(),
2686 )
2687 .await?
2688 };
2689 }
2690
2691 let (request, _, _) =
2692 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2693
2694 assert!(request.timeout.is_some());
2696
2697 Ok(())
2698 }
2699
2700 #[async_test]
2701 async fn test_timeout_three_lists() -> Result<()> {
2702 let (_server, sliding_sync) = new_sliding_sync(vec![
2703 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2704 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2705 SlidingSyncList::builder("baz")
2706 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2707 ])
2708 .await?;
2709
2710 let (request, _, _) =
2711 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2712
2713 assert!(request.timeout.is_none());
2715
2716 {
2718 let server_response = assign!(http::Response::new("0".to_owned()), {
2719 lists: BTreeMap::from([(
2720 "foo".to_owned(),
2721 assign!(http::response::List::default(), {
2722 count: uint!(7),
2723 })
2724 )])
2725 });
2726
2727 let _summary = {
2728 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2729 sliding_sync
2730 .handle_response(
2731 server_response.clone(),
2732 &mut pos_guard,
2733 RequestedRequiredStates::default(),
2734 )
2735 .await?
2736 };
2737 }
2738
2739 let (request, _, _) =
2740 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2741
2742 assert!(request.timeout.is_none());
2744
2745 {
2747 let server_response = assign!(http::Response::new("1".to_owned()), {
2748 lists: BTreeMap::from([(
2749 "bar".to_owned(),
2750 assign!(http::response::List::default(), {
2751 count: uint!(7),
2752 })
2753 )])
2754 });
2755
2756 let _summary = {
2757 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2758 sliding_sync
2759 .handle_response(
2760 server_response.clone(),
2761 &mut pos_guard,
2762 RequestedRequiredStates::default(),
2763 )
2764 .await?
2765 };
2766 }
2767
2768 let (request, _, _) =
2769 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2770
2771 assert!(request.timeout.is_some());
2773
2774 Ok(())
2775 }
2776
2777 #[async_test]
2778 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2779 let server = MockServer::start().await;
2780 let client = logged_in_client(Some(server.uri())).await;
2781
2782 let _mock_guard = Mock::given(SlidingSyncMatcher)
2783 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2784 "pos": "0",
2785 "lists": {},
2786 "rooms": {}
2787 })))
2788 .mount_as_scoped(&server)
2789 .await;
2790
2791 let sliding_sync = client
2792 .sliding_sync("test")?
2793 .with_to_device_extension(
2794 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2795 )
2796 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2797 .build()
2798 .await?;
2799
2800 let sliding_sync = Arc::new(sliding_sync);
2801
2802 let sync_beat_listener = client.inner.sync_beat.listen();
2804 sliding_sync.sync_once().await?;
2805
2806 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2808 Ok(())
2809 }
2810
2811 #[async_test]
2812 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2813 let server = MockServer::start().await;
2814 let client = logged_in_client(Some(server.uri())).await;
2815
2816 let _mock_guard = Mock::given(SlidingSyncMatcher)
2817 .respond_with(ResponseTemplate::new(404))
2818 .mount_as_scoped(&server)
2819 .await;
2820
2821 let sliding_sync = client
2822 .sliding_sync("test")?
2823 .with_to_device_extension(
2824 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2825 )
2826 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2827 .build()
2828 .await?;
2829
2830 let sliding_sync = Arc::new(sliding_sync);
2831
2832 let sync_beat_listener = client.inner.sync_beat.listen();
2834 let sync_result = sliding_sync.sync_once().await;
2835 assert!(sync_result.is_err());
2836
2837 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2839
2840 Ok(())
2841 }
2842
2843 #[async_test]
2844 async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2845 let server = MatrixMockServer::new().await;
2846 let client = server.client_builder().build().await;
2847 let room_id = room_id!("!mu5hr00m:example.org");
2848
2849 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2850 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2851 "pos": "0",
2852 "lists": {},
2853 "extensions": {
2854 "account_data": {
2855 "global": [
2856 {
2857 "type": "m.direct",
2858 "content": {
2859 "@de4dlockh0lmes:example.org": [
2860 "!mu5hr00m:example.org"
2861 ]
2862 }
2863 }
2864 ]
2865 }
2866 },
2867 "rooms": {
2868 room_id: {
2869 "name": "Mario Bros Fanbase Room",
2870 "initial": true,
2871 },
2872 }
2873 })))
2874 .mount_as_scoped(server.server())
2875 .await;
2876
2877 let f = EventFactory::new().room(room_id);
2878
2879 Mock::given(method("GET"))
2880 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2881 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2882 "chunk": [
2883 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2884 ]
2885 })))
2886 .mount(server.server())
2887 .await;
2888
2889 let (tx, rx) = tokio::sync::oneshot::channel();
2890
2891 let tx = Arc::new(Mutex::new(Some(tx)));
2892 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2893 let members =
2895 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2896 assert_eq!(members.len(), 1);
2897 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2898 });
2899
2900 let sliding_sync = client
2901 .sliding_sync("test")?
2902 .add_list(SlidingSyncList::builder("thelist"))
2903 .with_account_data_extension(
2904 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2905 )
2906 .build()
2907 .await?;
2908
2909 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2910 .await
2911 .expect("Sync did not complete in time")
2912 .expect("Sync failed");
2913
2914 tokio::time::timeout(Duration::from_secs(5), rx)
2916 .await
2917 .expect("Event handler did not complete in time")
2918 .expect("Event handler failed");
2919
2920 Ok(())
2921 }
2922}