1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28 collections::{btree_map::Entry, BTreeMap, HashSet},
29 fmt::Debug,
30 future::Future,
31 sync::{Arc, RwLock as StdRwLock},
32 time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38pub use matrix_sdk_base::sliding_sync::http;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41 api::{client::error::ErrorKind, OutgoingRequest},
42 assign, OwnedEventId, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46 select,
47 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55 cache::restore_sliding_sync_state,
56 client::SlidingSyncResponseProcessor,
57 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, HttpError, Result};
60
61#[derive(Clone, Debug)]
65pub struct SlidingSync {
66 inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72 id: String,
76
77 version: Version,
80
81 client: Client,
83
84 poll_timeout: Duration,
86
87 network_timeout: Duration,
90
91 storage_key: String,
93
94 share_pos: bool,
101
102 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
115
116 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
118
119 rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
121
122 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
124
125 internal_channel: Sender<SlidingSyncInternalMessage>,
128}
129
130impl SlidingSync {
131 pub(super) fn new(inner: SlidingSyncInner) -> Self {
132 Self { inner: Arc::new(inner) }
133 }
134
135 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
136 cache::store_sliding_sync_state(self, position).await
137 }
138
139 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
141 SlidingSyncBuilder::new(id, client)
142 }
143
144 pub fn subscribe_to_rooms(
151 &self,
152 room_ids: &[&RoomId],
153 settings: Option<http::request::RoomSubscription>,
154 cancel_in_flight_request: bool,
155 ) {
156 let settings = settings.unwrap_or_default();
157 let mut sticky = self.inner.sticky.write().unwrap();
158 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
159
160 let mut skip_over_current_sync_loop_iteration = false;
161
162 for room_id in room_ids {
163 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
170 if let Some(room) = self.inner.client.get_room(room_id) {
171 room.mark_members_missing();
172 }
173
174 entry.insert((RoomSubscriptionState::default(), settings.clone()));
175
176 skip_over_current_sync_loop_iteration = true;
177 }
178 }
179
180 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
181 self.inner.internal_channel_send_if_possible(
182 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
183 );
184 }
185 }
186
187 pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
189 self.inner.rooms.read().await.get(room_id).cloned()
190 }
191
192 pub fn get_number_of_rooms(&self) -> usize {
194 self.inner.rooms.blocking_read().len()
195 }
196
197 pub async fn on_list<Function, FunctionOutput, R>(
199 &self,
200 list_name: &str,
201 function: Function,
202 ) -> Option<R>
203 where
204 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
205 FunctionOutput: Future<Output = R>,
206 {
207 let lists = self.inner.lists.read().await;
208
209 match lists.get(list_name) {
210 Some(list) => Some(function(list).await),
211 None => None,
212 }
213 }
214
215 pub async fn add_list(
221 &self,
222 list_builder: SlidingSyncListBuilder,
223 ) -> Result<Option<SlidingSyncList>> {
224 let list = list_builder.build(self.inner.internal_channel.clone());
225
226 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
227
228 self.inner.internal_channel_send_if_possible(
229 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
230 );
231
232 Ok(old_list)
233 }
234
235 pub async fn add_cached_list(
242 &self,
243 mut list_builder: SlidingSyncListBuilder,
244 ) -> Result<Option<SlidingSyncList>> {
245 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
246
247 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
248
249 self.add_list(list_builder).await
250 }
251
252 pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
254 &self,
255 room_ids: I,
256 ) -> Vec<Option<SlidingSyncRoom>> {
257 let rooms = self.inner.rooms.read().await;
258
259 room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
260 }
261
262 pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
264 self.inner.rooms.read().await.values().cloned().collect()
265 }
266
267 #[instrument(skip_all)]
269 async fn handle_response(
270 &self,
271 mut sliding_sync_response: http::Response,
272 position: &mut SlidingSyncPositionMarkers,
273 ) -> Result<UpdateSummary, crate::Error> {
274 let pos = Some(sliding_sync_response.pos.clone());
275
276 let must_process_rooms_response = self.must_process_rooms_response().await;
277
278 trace!(yes = must_process_rooms_response, "Must process rooms response?");
279
280 if !self.inner.version.is_native() && must_process_rooms_response {
283 let known_rooms = self.inner.rooms.read().await;
284 compute_limited(&known_rooms, &mut sliding_sync_response.rooms);
285 }
286
287 let mut sync_response = {
295 let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
298
299 let rooms = &*self.inner.rooms.read().await;
300 let mut response_processor =
301 SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
302
303 #[cfg(feature = "e2e-encryption")]
304 if self.is_e2ee_enabled() {
305 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
306 }
307
308 if must_process_rooms_response {
315 response_processor
316 .handle_room_response(&sliding_sync_response, self.inner.version.is_native())
317 .await?;
318 }
319
320 response_processor.process_and_take_response().await?
321 };
322
323 debug!(?sync_response, "Sliding Sync response has been handled by the client");
324
325 if let Some(ref txn_id) = sliding_sync_response.txn_id {
327 let txn_id = txn_id.as_str().into();
328 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
329 let mut lists = self.inner.lists.write().await;
330 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
331 }
332
333 let update_summary = {
334 let updated_rooms = {
336 let mut rooms_map = self.inner.rooms.write().await;
337
338 let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
339
340 for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
341 let timeline =
345 if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
346 joined_room.timeline.events
347 } else {
348 room_data.timeline.drain(..).map(TimelineEvent::new).collect()
349 };
350
351 match rooms_map.get_mut(&room_id) {
352 Some(room) => {
354 room.update(room_data, timeline);
355 }
356
357 None => {
359 rooms_map.insert(
360 room_id.clone(),
361 SlidingSyncRoom::new(
362 room_id.clone(),
363 room_data.prev_batch,
364 timeline,
365 ),
366 );
367 }
368 }
369
370 updated_rooms.push(room_id);
371 }
372
373 updated_rooms.extend(sync_response.rooms.join.keys().cloned());
381
382 updated_rooms
383 };
384
385 let updated_lists = {
387 debug!(
388 lists = ?sliding_sync_response.lists,
389 "Update lists"
390 );
391
392 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
393 let mut lists = self.inner.lists.write().await;
394
395 for (name, list) in lists.iter_mut() {
398 if let Some(updates) = sliding_sync_response.lists.get(name) {
399 let maximum_number_of_rooms: u32 =
400 updates.count.try_into().expect("failed to convert `count` to `u32`");
401
402 if list.update(Some(maximum_number_of_rooms))? {
403 updated_lists.push(name.clone());
404 }
405 } else if list.update(None)? {
406 updated_lists.push(name.clone());
407 }
408 }
409
410 for name in sliding_sync_response.lists.keys() {
412 if !lists.contains_key(name) {
413 error!("Response for list `{name}` - unknown to us; skipping");
414 }
415 }
416
417 updated_lists
418 };
419
420 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
421 };
422
423 position.pos = pos;
427
428 Ok(update_summary)
429 }
430
431 async fn generate_sync_request(
432 &self,
433 txn_id: &mut LazyTransactionId,
434 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
435 let mut requests_lists = BTreeMap::new();
437
438 let require_timeout = {
439 let lists = self.inner.lists.read().await;
440
441 let mut require_timeout = true;
443
444 for (name, list) in lists.iter() {
445 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
446 require_timeout = require_timeout && list.requires_timeout();
447 }
448
449 require_timeout
450 };
451
452 let mut position_guard = self.inner.position.clone().lock_owned().await;
460
461 let to_device_enabled =
462 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
463
464 let restored_fields = if self.inner.share_pos || to_device_enabled {
465 let lists = self.inner.lists.read().await;
466 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
467 } else {
468 None
469 };
470
471 let pos = if self.inner.share_pos {
474 if let Some(fields) = &restored_fields {
475 if fields.pos != position_guard.pos {
477 info!(
478 "Pos from previous request ('{:?}') was different from \
479 pos in database ('{:?}').",
480 position_guard.pos, fields.pos
481 );
482 position_guard.pos = fields.pos.clone();
483 }
484 fields.pos.clone()
485 } else {
486 position_guard.pos.clone()
487 }
488 } else {
489 position_guard.pos.clone()
490 };
491
492 Span::current().record("pos", &pos);
493
494 #[cfg(feature = "e2e-encryption")]
510 if pos.is_none() && self.inner.version.is_native() && self.is_e2ee_enabled() {
511 info!("Marking all tracked users as dirty");
512
513 let olm_machine = self.inner.client.olm_machine().await;
514 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
515 olm_machine.mark_all_tracked_users_as_dirty().await?;
516 }
517
518 let timeout = require_timeout.then(|| self.inner.poll_timeout);
523
524 let mut request = assign!(http::Request::new(), {
525 conn_id: Some(self.inner.id.clone()),
526 pos,
527 timeout,
528 lists: requests_lists,
529 });
530
531 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
533
534 if to_device_enabled {
538 request.extensions.to_device.since =
539 restored_fields.and_then(|fields| fields.to_device_token);
540 }
541
542 if let Some(txn_id) = txn_id.get() {
544 request.txn_id = Some(txn_id.to_string());
545 }
546
547 Ok((
548 request,
550 RequestConfig::default()
553 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
554 .retry_limit(3),
555 position_guard,
556 ))
557 }
558
559 async fn send_sync_request<Request>(
564 &self,
565 request: Request,
566 request_config: RequestConfig,
567 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
568 ) -> Result<UpdateSummary>
569 where
570 Request: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
571 Request::IncomingResponse: Send
572 + Sync
573 +
574 Into<http::Response>,
577 HttpError: From<ruma::api::error::FromHttpResponseError<Request::EndpointError>>,
578 {
579 debug!("Sending request");
580
581 let request = self
583 .inner
584 .client
585 .send(request)
586 .with_request_config(request_config)
587 .with_homeserver_override(self.inner.version.overriding_url().map(ToString::to_string));
588
589 #[cfg(feature = "e2e-encryption")]
596 let response = {
597 if self.is_e2ee_enabled() {
598 let client = self.inner.client.clone();
615 let e2ee_uploads = spawn(async move {
616 if let Err(error) = client.send_outgoing_requests().await {
617 error!(?error, "Error while sending outgoing E2EE requests");
618 }
619 })
620 .abort_on_drop();
623
624 let response = request.await?;
626
627 e2ee_uploads.await.map_err(|error| Error::JoinError {
632 task_description: "e2ee_uploads".to_owned(),
633 error,
634 })?;
635
636 response
637 } else {
638 request.await?
639 }
640 };
641
642 #[cfg(not(feature = "e2e-encryption"))]
644 let response = request.await?;
645
646 let response = Into::<http::msc4186::Response>::into(response);
650
651 debug!("Received response");
652
653 let this = self.clone();
663
664 let future = async move {
667 debug!("Start handling response");
668
669 let updates = this.handle_response(response, &mut position_guard).await?;
675
676 this.cache_to_storage(&position_guard).await?;
677
678 drop(position_guard);
681
682 debug!("Done handling response");
683
684 Ok(updates)
685 };
686
687 spawn(future.instrument(Span::current())).await.unwrap()
688 }
689
690 #[cfg(feature = "e2e-encryption")]
692 fn is_e2ee_enabled(&self) -> bool {
693 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
694 }
695
696 #[cfg(not(feature = "e2e-encryption"))]
697 fn is_e2ee_enabled(&self) -> bool {
698 false
699 }
700
701 async fn must_process_rooms_response(&self) -> bool {
703 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
706 || !self.inner.lists.read().await.is_empty()
707 }
708
709 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
710 async fn sync_once(&self) -> Result<UpdateSummary> {
711 let (request, request_config, position_guard) =
712 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
713
714 let summaries = if !self.inner.version.is_native() {
718 self.send_sync_request(
719 Into::<http::msc3575::Request>::into(request),
720 request_config,
721 position_guard,
722 )
723 .await?
724 } else {
725 self.send_sync_request(request, request_config, position_guard).await?
726 };
727
728 self.inner.client.inner.sync_beat.notify(usize::MAX);
730
731 Ok(summaries)
732 }
733
734 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
744 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
745 debug!("Starting sync stream");
746
747 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
748
749 stream! {
750 loop {
751 debug!("Sync stream is running");
752
753 select! {
754 biased;
755
756 internal_message = internal_channel_receiver.recv() => {
757 use SlidingSyncInternalMessage::*;
758
759 debug!(?internal_message, "Sync stream has received an internal message");
760
761 match internal_message {
762 Err(_) | Ok(SyncLoopStop) => {
763 break;
764 }
765
766 Ok(SyncLoopSkipOverCurrentIteration) => {
767 continue;
768 }
769 }
770 }
771
772 update_summary = self.sync_once() => {
773 match update_summary {
774 Ok(updates) => {
775 yield Ok(updates);
776 }
777
778 Err(error) => {
780 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
781 self.expire_session().await;
783 }
784
785 yield Err(error);
786
787 break;
789 }
790 }
791 }
792 }
793 }
794
795 debug!("Sync stream has exited.");
796 }
797 }
798
799 pub fn stop_sync(&self) -> Result<()> {
808 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
809 }
810
811 #[doc(hidden)]
823 pub async fn expire_session(&self) {
824 info!("Session expired; resetting `pos` and sticky parameters");
825
826 {
827 let mut position = self.inner.position.lock().await;
828 position.pos = None;
829
830 if let Err(err) = self.cache_to_storage(&position).await {
831 error!(
832 "couldn't invalidate sliding sync frozen state when expiring session: {err}"
833 );
834 }
835 }
836
837 {
838 let mut sticky = self.inner.sticky.write().unwrap();
839
840 sticky.data_mut().room_subscriptions.clear();
843 }
844
845 self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
846 }
847}
848
849impl SlidingSyncInner {
850 #[instrument]
852 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
853 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
854 }
855
856 #[instrument]
859 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
860 let _ = self.internal_channel.send(message);
862 }
863}
864
865#[derive(Copy, Clone, Debug, PartialEq)]
866enum SlidingSyncInternalMessage {
867 SyncLoopStop,
869
870 SyncLoopSkipOverCurrentIteration,
873}
874
875#[cfg(any(test, feature = "testing"))]
876impl SlidingSync {
877 pub async fn set_pos(&self, new_pos: String) {
879 let mut position_lock = self.inner.position.lock().await;
880 position_lock.pos = Some(new_pos);
881 }
882
883 pub fn version(&self) -> &Version {
885 &self.inner.version
886 }
887
888 pub fn extensions_config(&self) -> http::request::Extensions {
894 let sticky = self.inner.sticky.read().unwrap();
895 sticky.data().extensions.clone()
896 }
897}
898
899#[derive(Clone, Debug)]
900pub(super) struct SlidingSyncPositionMarkers {
901 pos: Option<String>,
906}
907
908#[derive(Debug, Serialize, Deserialize)]
910struct FrozenSlidingSync {
911 #[serde(skip_serializing_if = "Option::is_none")]
913 to_device_since: Option<String>,
914 #[serde(default, skip_serializing_if = "Vec::is_empty")]
915 rooms: Vec<FrozenSlidingSyncRoom>,
916}
917
918impl FrozenSlidingSync {
919 fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
920 Self {
922 to_device_since: None,
923 rooms: rooms
924 .iter()
925 .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
926 .collect::<Vec<_>>(),
927 }
928 }
929}
930
931#[derive(Serialize, Deserialize)]
932struct FrozenSlidingSyncPos {
933 #[serde(skip_serializing_if = "Option::is_none")]
934 pos: Option<String>,
935}
936
937#[derive(Debug, Clone)]
940pub struct UpdateSummary {
941 pub lists: Vec<String>,
943 pub rooms: Vec<OwnedRoomId>,
945}
946
947#[derive(Debug, Default)]
951enum RoomSubscriptionState {
952 #[default]
956 Pending,
957
958 Applied,
961}
962
963#[derive(Debug)]
966pub(super) struct SlidingSyncStickyParameters {
967 room_subscriptions:
970 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
971
972 extensions: http::request::Extensions,
975}
976
977impl SlidingSyncStickyParameters {
978 pub fn new(
980 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
981 extensions: http::request::Extensions,
982 ) -> Self {
983 Self {
984 room_subscriptions: room_subscriptions
985 .into_iter()
986 .map(|(room_id, room_subscription)| {
987 (room_id, (RoomSubscriptionState::Pending, room_subscription))
988 })
989 .collect(),
990 extensions,
991 }
992 }
993}
994
995impl StickyData for SlidingSyncStickyParameters {
996 type Request = http::Request;
997
998 fn apply(&self, request: &mut Self::Request) {
999 request.room_subscriptions = self
1000 .room_subscriptions
1001 .iter()
1002 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
1003 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
1004 .collect();
1005 request.extensions = self.extensions.clone();
1006 }
1007
1008 fn on_commit(&mut self) {
1009 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
1011 if matches!(state, RoomSubscriptionState::Pending) {
1012 *state = RoomSubscriptionState::Applied;
1013 }
1014 }
1015 }
1016}
1017
1018fn compute_limited(
1024 local_rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>,
1025 remote_rooms: &mut BTreeMap<OwnedRoomId, http::response::Room>,
1026) {
1027 for (room_id, remote_room) in remote_rooms {
1028 let initial = remote_room.initial.unwrap_or(false);
1030 if !initial {
1031 continue;
1032 }
1033
1034 if remote_room.limited {
1035 continue;
1037 }
1038
1039 let remote_events = &remote_room.timeline;
1040 if remote_events.is_empty() {
1041 trace!(?room_id, "no timeline updates in the response => not limited");
1042 continue;
1043 }
1044
1045 let Some(local_room) = local_rooms.get(room_id) else {
1046 trace!(?room_id, "room isn't known locally => not limited");
1047 continue;
1048 };
1049
1050 let local_events = local_room.timeline_queue();
1051
1052 if local_events.is_empty() {
1053 trace!(?room_id, "local timeline had no events => not limited");
1054 continue;
1055 }
1056
1057 let num_local_events = local_events.len();
1063 let local_events_with_ids: HashSet<OwnedEventId> =
1064 HashSet::from_iter(local_events.into_iter().filter_map(|event| event.event_id()));
1065
1066 let mut num_remote_events_missing_ids = 0;
1069 let overlap = remote_events.iter().any(|remote_event| {
1070 if let Some(remote_event_id) =
1071 remote_event.get_field::<OwnedEventId>("event_id").ok().flatten()
1072 {
1073 local_events_with_ids.contains(&remote_event_id)
1074 } else {
1075 num_remote_events_missing_ids += 1;
1076 false
1077 }
1078 });
1079
1080 remote_room.limited = !overlap;
1081
1082 trace!(
1083 ?room_id,
1084 num_events_response = remote_events.len(),
1085 num_local_events,
1086 num_local_events_with_ids = local_events_with_ids.len(),
1087 num_remote_events_missing_ids,
1088 room_limited = remote_room.limited,
1089 "done"
1090 );
1091 }
1092}
1093
1094#[cfg(all(test, not(target_family = "wasm")))]
1095#[allow(clippy::dbg_macro)]
1096mod tests {
1097 use std::{
1098 collections::BTreeMap,
1099 future::ready,
1100 ops::Not,
1101 sync::{Arc, Mutex},
1102 time::Duration,
1103 };
1104
1105 use assert_matches::assert_matches;
1106 use event_listener::Listener;
1107 use futures_util::{future::join_all, pin_mut, StreamExt};
1108 use matrix_sdk_common::deserialized_responses::TimelineEvent;
1109 use matrix_sdk_test::async_test;
1110 use ruma::{
1111 api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
1112 OwnedRoomId, TransactionId,
1113 };
1114 use serde::Deserialize;
1115 use serde_json::json;
1116 use url::Url;
1117 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
1118
1119 use super::{
1120 compute_limited, http,
1121 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
1122 FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
1123 SlidingSyncRoom, SlidingSyncStickyParameters, Version,
1124 };
1125 use crate::{
1126 sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
1127 };
1128
1129 #[derive(Copy, Clone)]
1130 struct SlidingSyncMatcher;
1131
1132 impl Match for SlidingSyncMatcher {
1133 fn matches(&self, request: &Request) -> bool {
1134 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1135 && request.method == Method::POST
1136 }
1137 }
1138
1139 async fn new_sliding_sync(
1140 lists: Vec<SlidingSyncListBuilder>,
1141 ) -> Result<(MockServer, SlidingSync)> {
1142 let server = MockServer::start().await;
1143 let client = logged_in_client(Some(server.uri())).await;
1144
1145 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1146
1147 for list in lists {
1148 sliding_sync_builder = sliding_sync_builder.add_list(list);
1149 }
1150
1151 let sliding_sync = sliding_sync_builder.build().await?;
1152
1153 Ok((server, sliding_sync))
1154 }
1155
1156 #[async_test]
1157 async fn test_subscribe_to_rooms() -> Result<()> {
1158 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1159 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1160 .await?;
1161
1162 let stream = sliding_sync.sync();
1163 pin_mut!(stream);
1164
1165 let room_id_0 = room_id!("!r0:bar.org");
1166 let room_id_1 = room_id!("!r1:bar.org");
1167 let room_id_2 = room_id!("!r2:bar.org");
1168
1169 {
1170 let _mock_guard = Mock::given(SlidingSyncMatcher)
1171 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1172 "pos": "1",
1173 "lists": {},
1174 "rooms": {
1175 room_id_0: {
1176 "name": "Room #0",
1177 "initial": true,
1178 },
1179 room_id_1: {
1180 "name": "Room #1",
1181 "initial": true,
1182 },
1183 room_id_2: {
1184 "name": "Room #2",
1185 "initial": true,
1186 },
1187 }
1188 })))
1189 .mount_as_scoped(&server)
1190 .await;
1191
1192 let _ = stream.next().await.unwrap()?;
1193 }
1194
1195 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1196
1197 assert!(room0.are_members_synced().not());
1201
1202 {
1203 struct MemberMatcher(OwnedRoomId);
1204
1205 impl Match for MemberMatcher {
1206 fn matches(&self, request: &Request) -> bool {
1207 request.url.path()
1208 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1209 && request.method == Method::GET
1210 }
1211 }
1212
1213 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1214 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1215 "chunk": [],
1216 })))
1217 .mount_as_scoped(&server)
1218 .await;
1219
1220 assert_matches!(room0.request_members().await, Ok(()));
1221 }
1222
1223 assert!(room0.are_members_synced());
1225
1226 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1227
1228 assert!(room0.are_members_synced().not());
1231
1232 {
1233 let sticky = sliding_sync.inner.sticky.read().unwrap();
1234 let room_subscriptions = &sticky.data().room_subscriptions;
1235
1236 assert!(room_subscriptions.contains_key(room_id_0));
1237 assert!(room_subscriptions.contains_key(room_id_1));
1238 assert!(!room_subscriptions.contains_key(room_id_2));
1239 }
1240
1241 {
1244 struct MemberMatcher(OwnedRoomId);
1245
1246 impl Match for MemberMatcher {
1247 fn matches(&self, request: &Request) -> bool {
1248 request.url.path()
1249 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1250 && request.method == Method::GET
1251 }
1252 }
1253
1254 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1255 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1256 "chunk": [],
1257 })))
1258 .mount_as_scoped(&server)
1259 .await;
1260
1261 assert_matches!(room0.request_members().await, Ok(()));
1262 }
1263
1264 assert!(room0.are_members_synced());
1266
1267 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1268
1269 assert!(room0.are_members_synced());
1272
1273 Ok(())
1274 }
1275
1276 #[async_test]
1277 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1278 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1279 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1280 .await?;
1281
1282 let room_id_0 = room_id!("!r0:bar.org");
1283 let room_id_1 = room_id!("!r1:bar.org");
1284 let room_id_2 = room_id!("!r2:bar.org");
1285
1286 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1288
1289 {
1290 let sticky = sliding_sync.inner.sticky.read().unwrap();
1291 let room_subscriptions = &sticky.data().room_subscriptions;
1292
1293 assert!(room_subscriptions.contains_key(room_id_0));
1294 assert!(room_subscriptions.contains_key(room_id_1));
1295 assert!(room_subscriptions.contains_key(room_id_2).not());
1296 }
1297
1298 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1300
1301 {
1302 let sticky = sliding_sync.inner.sticky.read().unwrap();
1303 let room_subscriptions = &sticky.data().room_subscriptions;
1304
1305 assert!(room_subscriptions.contains_key(room_id_0));
1306 assert!(room_subscriptions.contains_key(room_id_1));
1307 assert!(room_subscriptions.contains_key(room_id_2));
1308 }
1309
1310 sliding_sync.expire_session().await;
1312
1313 {
1314 let sticky = sliding_sync.inner.sticky.read().unwrap();
1315 let room_subscriptions = &sticky.data().room_subscriptions;
1316
1317 assert!(room_subscriptions.is_empty());
1318 }
1319
1320 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1322
1323 {
1324 let sticky = sliding_sync.inner.sticky.read().unwrap();
1325 let room_subscriptions = &sticky.data().room_subscriptions;
1326
1327 assert!(room_subscriptions.contains_key(room_id_0).not());
1328 assert!(room_subscriptions.contains_key(room_id_1).not());
1329 assert!(room_subscriptions.contains_key(room_id_2));
1330 }
1331
1332 Ok(())
1333 }
1334
1335 #[async_test]
1336 async fn test_to_device_token_properly_cached() -> Result<()> {
1337 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1338 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1339 .await?;
1340
1341 let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1344 assert!(frozen.to_device_since.is_none());
1345
1346 Ok(())
1347 }
1348
1349 #[async_test]
1350 async fn test_add_list() -> Result<()> {
1351 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1352 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1353 .await?;
1354
1355 let _stream = sliding_sync.sync();
1356 pin_mut!(_stream);
1357
1358 sliding_sync
1359 .add_list(
1360 SlidingSyncList::builder("bar")
1361 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1362 )
1363 .await?;
1364
1365 let lists = sliding_sync.inner.lists.read().await;
1366
1367 assert!(lists.contains_key("foo"));
1368 assert!(lists.contains_key("bar"));
1369
1370 Ok(())
1373 }
1374
1375 #[test]
1376 fn test_sticky_parameters_api_invalidated_flow() {
1377 let r0 = room_id!("!r0.matrix.org");
1378 let r1 = room_id!("!r1:matrix.org");
1379
1380 let mut room_subscriptions = BTreeMap::new();
1381 room_subscriptions.insert(r0.to_owned(), Default::default());
1382
1383 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1385 room_subscriptions,
1386 Default::default(),
1387 ));
1388 assert!(sticky.is_invalidated());
1389
1390 let txn_id: &TransactionId = "tid123".into();
1392
1393 let mut request = http::Request::default();
1394 request.txn_id = Some(txn_id.to_string());
1395
1396 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1397
1398 assert!(request.txn_id.is_some());
1399 assert_eq!(request.room_subscriptions.len(), 1);
1400 assert!(request.room_subscriptions.contains_key(r0));
1401
1402 let tid = request.txn_id.unwrap();
1403
1404 sticky.maybe_commit(tid.as_str().into());
1405 assert!(!sticky.is_invalidated());
1406
1407 sticky
1409 .data_mut()
1410 .room_subscriptions
1411 .insert(r1.to_owned(), (Default::default(), Default::default()));
1412 assert!(sticky.is_invalidated());
1413
1414 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1416 assert!(sticky.is_invalidated());
1417
1418 let txn_id1: &TransactionId = "tid456".into();
1420 let mut request1 = http::Request::default();
1421 request1.txn_id = Some(txn_id1.to_string());
1422 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1423
1424 assert!(sticky.is_invalidated());
1425 assert_eq!(request1.room_subscriptions.len(), 1);
1429 assert!(request1.room_subscriptions.contains_key(r1));
1430
1431 let txn_id2: &TransactionId = "tid789".into();
1432 let mut request2 = http::Request::default();
1433 request2.txn_id = Some(txn_id2.to_string());
1434
1435 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1436 assert!(sticky.is_invalidated());
1437 assert_eq!(request2.room_subscriptions.len(), 1);
1440 assert!(request2.room_subscriptions.contains_key(r1));
1441
1442 sticky.maybe_commit(txn_id1);
1445 assert!(sticky.is_invalidated());
1446
1447 sticky.maybe_commit(txn_id2);
1449 assert!(!sticky.is_invalidated());
1450 }
1451
1452 #[test]
1453 fn test_room_subscriptions_are_sticky() {
1454 let r0 = room_id!("!r0.matrix.org");
1455 let r1 = room_id!("!r1:matrix.org");
1456
1457 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1458 BTreeMap::new(),
1459 Default::default(),
1460 ));
1461
1462 {
1464 sticky
1466 .data_mut()
1467 .room_subscriptions
1468 .insert(r0.to_owned(), (Default::default(), Default::default()));
1469
1470 let txn_id: &TransactionId = "tid0".into();
1472 let mut request = http::Request::default();
1473 request.txn_id = Some(txn_id.to_string());
1474
1475 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1476
1477 assert!(request.txn_id.is_some());
1478 assert_eq!(request.room_subscriptions.len(), 1);
1479 assert!(request.room_subscriptions.contains_key(r0));
1480
1481 let tid = request.txn_id.unwrap();
1483
1484 sticky.maybe_commit(tid.as_str().into());
1485 }
1486
1487 {
1489 sticky
1491 .data_mut()
1492 .room_subscriptions
1493 .insert(r1.to_owned(), (Default::default(), Default::default()));
1494
1495 let txn_id: &TransactionId = "tid1".into();
1497 let mut request = http::Request::default();
1498 request.txn_id = Some(txn_id.to_string());
1499
1500 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1501
1502 assert!(request.txn_id.is_some());
1503 assert_eq!(request.room_subscriptions.len(), 1);
1504 assert!(request.room_subscriptions.contains_key(r1));
1506
1507 }
1511
1512 {
1514 let txn_id: &TransactionId = "tid2".into();
1516 let mut request = http::Request::default();
1517 request.txn_id = Some(txn_id.to_string());
1518
1519 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1520
1521 assert!(request.txn_id.is_some());
1522 assert_eq!(request.room_subscriptions.len(), 1);
1523 assert!(request.room_subscriptions.contains_key(r1));
1525
1526 let tid = request.txn_id.unwrap();
1528
1529 sticky.maybe_commit(tid.as_str().into());
1530 }
1531
1532 {
1534 let txn_id: &TransactionId = "tid3".into();
1536 let mut request = http::Request::default();
1537 request.txn_id = Some(txn_id.to_string());
1538
1539 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1540
1541 assert!(request.txn_id.is_some());
1542 assert!(request.room_subscriptions.is_empty());
1544 }
1545 }
1546
1547 #[test]
1548 fn test_extensions_are_sticky() {
1549 let mut extensions = http::request::Extensions::default();
1550 extensions.account_data.enabled = Some(true);
1551
1552 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1554 Default::default(),
1555 extensions,
1556 ));
1557
1558 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1559
1560 let extensions = &sticky.data().extensions;
1563 assert_eq!(extensions.e2ee.enabled, None);
1564 assert_eq!(extensions.to_device.enabled, None);
1565 assert_eq!(extensions.to_device.since, None);
1566
1567 assert_eq!(extensions.account_data.enabled, Some(true));
1569
1570 let txn_id: &TransactionId = "tid123".into();
1571 let mut request = http::Request::default();
1572 request.txn_id = Some(txn_id.to_string());
1573 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1574 assert!(sticky.is_invalidated());
1575 assert_eq!(request.extensions.to_device.enabled, None);
1576 assert_eq!(request.extensions.to_device.since, None);
1577 assert_eq!(request.extensions.e2ee.enabled, None);
1578 assert_eq!(request.extensions.account_data.enabled, Some(true));
1579 }
1580
1581 #[async_test]
1582 async fn test_sticky_extensions_plus_since() -> Result<()> {
1583 let server = MockServer::start().await;
1584 let client = logged_in_client(Some(server.uri())).await;
1585
1586 let sync = client
1587 .sliding_sync("test-slidingsync")?
1588 .add_list(SlidingSyncList::builder("new_list"))
1589 .build()
1590 .await?;
1591
1592 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1594 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1595 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1596
1597 let sync = client
1599 .sliding_sync("test-slidingsync")?
1600 .add_list(SlidingSyncList::builder("new_list"))
1601 .with_to_device_extension(
1602 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1603 )
1604 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1605 .build()
1606 .await?;
1607
1608 let txn_id = TransactionId::new();
1611 let (request, _, _) = sync
1612 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1613 .await?;
1614
1615 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1616 assert_eq!(request.extensions.to_device.enabled, Some(true));
1617 assert!(request.extensions.to_device.since.is_none());
1618
1619 {
1620 let mut sticky = sync.inner.sticky.write().unwrap();
1622 assert!(sticky.is_invalidated());
1623 sticky.maybe_commit(
1624 "hopefully the rng won't generate this very specific transaction id".into(),
1625 );
1626 assert!(sticky.is_invalidated());
1627 }
1628
1629 let txn_id2 = TransactionId::new();
1631 let (request, _, _) = sync
1632 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1633 .await?;
1634
1635 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1636 assert_eq!(request.extensions.to_device.enabled, Some(true));
1637 assert!(request.extensions.to_device.since.is_none());
1638
1639 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1640
1641 {
1642 let mut sticky = sync.inner.sticky.write().unwrap();
1644 assert!(sticky.is_invalidated());
1645 sticky.maybe_commit(txn_id2.as_str().into());
1646 assert!(!sticky.is_invalidated());
1647 }
1648
1649 let txn_id = TransactionId::new();
1651 let (request, _, _) = sync
1652 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1653 .await?;
1654 assert!(request.extensions.e2ee.enabled.is_none());
1655 assert!(request.extensions.to_device.enabled.is_none());
1656 assert!(request.extensions.to_device.since.is_none());
1657
1658 let _since_token = "since";
1662
1663 #[cfg(feature = "e2e-encryption")]
1664 {
1665 use matrix_sdk_base::crypto::store::Changes;
1666 if let Some(olm_machine) = &*client.olm_machine().await {
1667 olm_machine
1668 .store()
1669 .save_changes(Changes {
1670 next_batch_token: Some(_since_token.to_owned()),
1671 ..Default::default()
1672 })
1673 .await?;
1674 }
1675 }
1676
1677 let txn_id = TransactionId::new();
1678 let (request, _, _) = sync
1679 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1680 .await?;
1681
1682 assert!(request.extensions.e2ee.enabled.is_none());
1683 assert!(request.extensions.to_device.enabled.is_none());
1684
1685 #[cfg(feature = "e2e-encryption")]
1686 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1687
1688 Ok(())
1689 }
1690
1691 #[async_test]
1697 #[cfg(feature = "e2e-encryption")]
1698 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1699 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1700 use matrix_sdk_test::ruma_response_from_json;
1701 use ruma::user_id;
1702
1703 let server = MockServer::start().await;
1704 let client = logged_in_client(Some(server.uri())).await;
1705
1706 let alice = user_id!("@alice:localhost");
1707 let bob = user_id!("@bob:localhost");
1708 let me = user_id!("@example:localhost");
1709
1710 {
1713 let olm_machine = client.olm_machine().await;
1714 let olm_machine = olm_machine.as_ref().unwrap();
1715
1716 olm_machine.update_tracked_users([alice, bob]).await?;
1717
1718 let outgoing_requests = olm_machine.outgoing_requests().await?;
1720
1721 assert_eq!(outgoing_requests.len(), 2);
1722 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1723 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1724
1725 olm_machine
1727 .mark_request_as_sent(
1728 outgoing_requests[0].request_id(),
1729 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1730 "one_time_key_counts": {}
1731 }))),
1732 )
1733 .await?;
1734
1735 olm_machine
1736 .mark_request_as_sent(
1737 outgoing_requests[1].request_id(),
1738 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1739 "device_keys": {
1740 alice: {},
1741 bob: {},
1742 }
1743 }))),
1744 )
1745 .await?;
1746
1747 let outgoing_requests = olm_machine.outgoing_requests().await?;
1749
1750 assert_eq!(outgoing_requests.len(), 1);
1751 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1752
1753 olm_machine
1754 .mark_request_as_sent(
1755 outgoing_requests[0].request_id(),
1756 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1757 "device_keys": {
1758 me: {},
1759 }
1760 }))),
1761 )
1762 .await?;
1763
1764 let outgoing_requests = olm_machine.outgoing_requests().await?;
1766
1767 assert!(outgoing_requests.is_empty());
1768 }
1769
1770 let sync = client
1771 .sliding_sync("test-slidingsync")?
1772 .add_list(SlidingSyncList::builder("new_list"))
1773 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1774 .build()
1775 .await?;
1776
1777 let txn_id = TransactionId::new();
1779 let (_request, _, _) = sync
1780 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1781 .await?;
1782
1783 {
1785 let olm_machine = client.olm_machine().await;
1786 let olm_machine = olm_machine.as_ref().unwrap();
1787
1788 let outgoing_requests = olm_machine.outgoing_requests().await?;
1790
1791 assert_eq!(outgoing_requests.len(), 1);
1792 assert_matches!(
1793 outgoing_requests[0].request(),
1794 AnyOutgoingRequest::KeysQuery(request) => {
1795 assert!(request.device_keys.contains_key(alice));
1796 assert!(request.device_keys.contains_key(bob));
1797 assert!(request.device_keys.contains_key(me));
1798 }
1799 );
1800
1801 olm_machine
1803 .mark_request_as_sent(
1804 outgoing_requests[0].request_id(),
1805 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1806 "device_keys": {
1807 alice: {},
1808 bob: {},
1809 me: {},
1810 }
1811 }))),
1812 )
1813 .await?;
1814 }
1815
1816 sync.set_pos("chocolat".to_owned()).await;
1818
1819 let txn_id = TransactionId::new();
1820 let (_request, _, _) = sync
1821 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1822 .await?;
1823
1824 {
1826 let olm_machine = client.olm_machine().await;
1827 let olm_machine = olm_machine.as_ref().unwrap();
1828
1829 let outgoing_requests = olm_machine.outgoing_requests().await?;
1831
1832 assert!(outgoing_requests.is_empty());
1833 }
1834
1835 Ok(())
1836 }
1837
1838 #[async_test]
1839 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1840 let server = MockServer::start().await;
1841 let client = logged_in_client(Some(server.uri())).await;
1842
1843 let sliding_sync = client
1844 .sliding_sync("test-slidingsync")?
1845 .with_to_device_extension(
1846 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1847 )
1848 .build()
1849 .await?;
1850
1851 let (request, _, _) =
1853 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1854 assert!(request.extensions.to_device.enabled.is_some());
1855
1856 let sync = sliding_sync.sync();
1857 pin_mut!(sync);
1858
1859 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1861
1862 #[derive(Deserialize)]
1863 struct PartialRequest {
1864 txn_id: Option<String>,
1865 }
1866
1867 {
1868 let _mock_guard = Mock::given(SlidingSyncMatcher)
1869 .respond_with(|request: &Request| {
1870 let request: PartialRequest = request.body_json().unwrap();
1872
1873 ResponseTemplate::new(200).set_body_json(json!({
1874 "txn_id": request.txn_id,
1875 "pos": "0",
1876 }))
1877 })
1878 .mount_as_scoped(&server)
1879 .await;
1880
1881 let next = sync.next().await;
1882 assert_matches!(next, Some(Ok(_update_summary)));
1883
1884 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1886 }
1887
1888 let (request, _, _) =
1890 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1891 assert!(request.extensions.to_device.enabled.is_none());
1892
1893 {
1895 let _mock_guard = Mock::given(SlidingSyncMatcher)
1896 .respond_with(|request: &Request| {
1897 let request: PartialRequest = request.body_json().unwrap();
1899
1900 ResponseTemplate::new(200).set_body_json(json!({
1901 "txn_id": request.txn_id,
1902 "pos": "1",
1903 }))
1904 })
1905 .mount_as_scoped(&server)
1906 .await;
1907
1908 let next = sync.next().await;
1909 assert_matches!(next, Some(Ok(_update_summary)));
1910
1911 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1913 }
1914
1915 {
1918 let _mock_guard = Mock::given(SlidingSyncMatcher)
1919 .respond_with(|request: &Request| {
1920 let request: PartialRequest = request.body_json().unwrap();
1922
1923 ResponseTemplate::new(200).set_body_json(json!({
1924 "txn_id": request.txn_id,
1925 "pos": "0", }))
1927 })
1928 .up_to_n_times(1) .mount_as_scoped(&server)
1930 .await;
1931
1932 let next = sync.next().await;
1933 assert_matches!(next, Some(Ok(_update_summary)));
1934
1935 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1937 }
1938
1939 {
1944 let _mock_guard = Mock::given(SlidingSyncMatcher)
1945 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1946 "error": "foo",
1947 "errcode": "M_UNKNOWN_POS",
1948 })))
1949 .mount_as_scoped(&server)
1950 .await;
1951
1952 let next = sync.next().await;
1953
1954 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1956
1957 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1959
1960 let (request, _, _) =
1962 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1963
1964 assert!(request.extensions.to_device.enabled.is_some());
1965
1966 assert!(sync.next().await.is_none());
1968 }
1969
1970 Ok(())
1971 }
1972
1973 #[cfg(feature = "e2e-encryption")]
1974 #[async_test]
1975 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1976 let server = MockServer::start().await;
1977
1978 #[derive(Deserialize)]
1979 struct PartialRequest {
1980 txn_id: Option<String>,
1981 }
1982
1983 let server_pos = Arc::new(Mutex::new(0));
1984 let _mock_guard = Mock::given(SlidingSyncMatcher)
1985 .respond_with(move |request: &Request| {
1986 let request: PartialRequest = request.body_json().unwrap();
1988 let pos = {
1989 let mut pos = server_pos.lock().unwrap();
1990 let prev = *pos;
1991 *pos += 1;
1992 prev
1993 };
1994
1995 ResponseTemplate::new(200).set_body_json(json!({
1996 "txn_id": request.txn_id,
1997 "pos": pos.to_string(),
1998 }))
1999 })
2000 .mount_as_scoped(&server)
2001 .await;
2002
2003 let client = logged_in_client(Some(server.uri())).await;
2004
2005 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2006
2007 {
2009 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2010
2011 let (request, _, _) =
2012 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2013 assert!(request.pos.is_none());
2014 }
2015
2016 let sync = sliding_sync.sync();
2017 pin_mut!(sync);
2018
2019 let next = sync.next().await;
2022 assert_matches!(next, Some(Ok(_update_summary)));
2023
2024 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
2025
2026 let restored_fields = restore_sliding_sync_state(
2027 &client,
2028 &sliding_sync.inner.storage_key,
2029 &*sliding_sync.inner.lists.read().await,
2030 )
2031 .await?
2032 .expect("must have restored fields");
2033
2034 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2037
2038 {
2042 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2043
2044 let mut position_guard = other_sync.inner.position.lock().await;
2045 position_guard.pos = Some("yolo".to_owned());
2046
2047 other_sync.cache_to_storage(&position_guard).await?;
2048 }
2049
2050 {
2052 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
2053 let (request, _, _) =
2054 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2055 assert_eq!(request.pos.as_deref(), Some("0"));
2056 }
2057
2058 {
2061 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2062 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2063 }
2064
2065 Ok(())
2066 }
2067
2068 #[cfg(feature = "e2e-encryption")]
2069 #[async_test]
2070 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
2071 let server = MockServer::start().await;
2072
2073 #[derive(Deserialize)]
2074 struct PartialRequest {
2075 txn_id: Option<String>,
2076 }
2077
2078 let server_pos = Arc::new(Mutex::new(0));
2079 let _mock_guard = Mock::given(SlidingSyncMatcher)
2080 .respond_with(move |request: &Request| {
2081 let request: PartialRequest = request.body_json().unwrap();
2083 let pos = {
2084 let mut pos = server_pos.lock().unwrap();
2085 let prev = *pos;
2086 *pos += 1;
2087 prev
2088 };
2089
2090 ResponseTemplate::new(200).set_body_json(json!({
2091 "txn_id": request.txn_id,
2092 "pos": pos.to_string(),
2093 }))
2094 })
2095 .mount_as_scoped(&server)
2096 .await;
2097
2098 let client = logged_in_client(Some(server.uri())).await;
2099
2100 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2101
2102 {
2104 let (request, _, _) =
2105 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2106
2107 assert!(request.pos.is_none());
2108 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2109 }
2110
2111 let sync = sliding_sync.sync();
2112 pin_mut!(sync);
2113
2114 let next = sync.next().await;
2117 assert_matches!(next, Some(Ok(_update_summary)));
2118
2119 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
2120
2121 let restored_fields = restore_sliding_sync_state(
2122 &client,
2123 &sliding_sync.inner.storage_key,
2124 &*sliding_sync.inner.lists.read().await,
2125 )
2126 .await?
2127 .expect("must have restored fields");
2128
2129 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2132
2133 {
2135 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2136
2137 let mut position_guard = other_sync.inner.position.lock().await;
2138 position_guard.pos = Some("42".to_owned());
2139
2140 other_sync.cache_to_storage(&position_guard).await?;
2141 }
2142
2143 {
2145 let (request, _, _) =
2146 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2147 assert_eq!(request.pos.as_deref(), Some("42"));
2148 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2149 }
2150
2151 {
2153 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2154 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2155
2156 let (request, _, _) =
2157 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2158 assert_eq!(request.pos.as_deref(), Some("42"));
2159 }
2160
2161 sliding_sync.expire_session().await;
2164
2165 {
2166 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2167
2168 let (request, _, _) =
2169 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2170 assert!(request.pos.is_none());
2171 }
2172
2173 {
2175 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2176 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2177
2178 let (request, _, _) =
2179 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2180 assert!(request.pos.is_none());
2181 }
2182
2183 Ok(())
2184 }
2185
2186 #[async_test]
2187 async fn test_stop_sync_loop() -> Result<()> {
2188 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2189 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2190 .await?;
2191
2192 let stream = sliding_sync.sync();
2194 pin_mut!(stream);
2195
2196 assert!(stream.next().await.is_some());
2198
2199 sliding_sync.stop_sync()?;
2201
2202 assert!(stream.next().await.is_none());
2204
2205 let stream = sliding_sync.sync();
2207 pin_mut!(stream);
2208
2209 assert!(stream.next().await.is_some());
2211
2212 Ok(())
2213 }
2214
2215 #[async_test]
2216 async fn test_sliding_sync_version() -> Result<()> {
2217 let server = MockServer::start().await;
2218 let client = logged_in_client(Some(server.uri())).await;
2219
2220 {
2223 let sync = client.sliding_sync("default")?.build().await?;
2224
2225 assert_matches!(sync.version(), Version::Native);
2226 }
2227
2228 {
2230 let url = Url::parse("https://bar.matrix/").unwrap();
2231 let sync = client
2232 .sliding_sync("own-proxy")?
2233 .version(Version::Proxy { url: url.clone() })
2234 .build()
2235 .await?;
2236
2237 assert_matches!(
2238 sync.version(),
2239 Version::Proxy { url: given_url } => {
2240 assert_eq!(&url, given_url);
2241 }
2242 );
2243 }
2244
2245 let url = Url::parse("https://foo.matrix/").unwrap();
2247 client.set_sliding_sync_version(Version::Proxy { url: url.clone() });
2248
2249 {
2250 let sync = client.sliding_sync("client-proxy")?.build().await?;
2252
2253 assert_matches!(
2254 sync.version(),
2255 Version::Proxy { url: given_url } => {
2256 assert_eq!(&url, given_url);
2257 }
2258 );
2259 }
2260
2261 {
2262 let sync = client.sliding_sync("own-proxy")?.version(Version::Native).build().await?;
2264
2265 assert_matches!(sync.version(), Version::Native);
2266 }
2267
2268 Ok(())
2269 }
2270
2271 #[async_test]
2272 async fn test_limited_flag_computation() {
2273 let make_event = |event_id: &str| -> TimelineEvent {
2274 TimelineEvent::new(
2275 Raw::from_json_string(
2276 json!({
2277 "event_id": event_id,
2278 "sender": "@johnmastodon:example.org",
2279 "origin_server_ts": 1337424242,
2280 "type": "m.room.message",
2281 "room_id": "!meaningless:example.org",
2282 "content": {
2283 "body": "Hello, world!",
2284 "msgtype": "m.text"
2285 },
2286 })
2287 .to_string(),
2288 )
2289 .unwrap(),
2290 )
2291 };
2292
2293 let event_a = make_event("$a");
2294 let event_b = make_event("$b");
2295 let event_c = make_event("$c");
2296 let event_d = make_event("$d");
2297
2298 let not_initial = room_id!("!croissant:example.org");
2299 let no_overlap = room_id!("!omelette:example.org");
2300 let partial_overlap = room_id!("!fromage:example.org");
2301 let complete_overlap = room_id!("!baguette:example.org");
2302 let no_remote_events = room_id!("!pain:example.org");
2303 let no_local_events = room_id!("!crepe:example.org");
2304 let already_limited = room_id!("!paris:example.org");
2305
2306 let response_timeline = vec![event_c.raw().clone(), event_d.raw().clone()];
2307
2308 let local_rooms = BTreeMap::from_iter([
2309 (
2310 not_initial.to_owned(),
2313 SlidingSyncRoom::new(
2314 no_overlap.to_owned(),
2315 None,
2316 vec![event_a.clone(), event_b.clone()],
2317 ),
2318 ),
2319 (
2320 no_overlap.to_owned(),
2322 SlidingSyncRoom::new(
2323 no_overlap.to_owned(),
2324 None,
2325 vec![event_a.clone(), event_b.clone()],
2326 ),
2327 ),
2328 (
2329 partial_overlap.to_owned(),
2331 SlidingSyncRoom::new(
2332 partial_overlap.to_owned(),
2333 None,
2334 vec![event_a.clone(), event_b.clone(), event_c.clone()],
2335 ),
2336 ),
2337 (
2338 complete_overlap.to_owned(),
2340 SlidingSyncRoom::new(
2341 partial_overlap.to_owned(),
2342 None,
2343 vec![event_c.clone(), event_d.clone()],
2344 ),
2345 ),
2346 (
2347 no_remote_events.to_owned(),
2350 SlidingSyncRoom::new(
2351 no_remote_events.to_owned(),
2352 None,
2353 vec![event_c.clone(), event_d.clone()],
2354 ),
2355 ),
2356 (
2357 no_local_events.to_owned(),
2360 SlidingSyncRoom::new(no_local_events.to_owned(), None, vec![]),
2361 ),
2362 (
2363 already_limited.to_owned(),
2366 SlidingSyncRoom::new(
2367 already_limited.to_owned(),
2368 None,
2369 vec![event_a, event_b, event_c.clone()],
2370 ),
2371 ),
2372 ]);
2373
2374 let mut remote_rooms = BTreeMap::from_iter([
2375 (
2376 not_initial.to_owned(),
2377 assign!(http::response::Room::default(), { timeline: response_timeline }),
2378 ),
2379 (
2380 no_overlap.to_owned(),
2381 assign!(http::response::Room::default(), {
2382 initial: Some(true),
2383 timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2384 }),
2385 ),
2386 (
2387 partial_overlap.to_owned(),
2388 assign!(http::response::Room::default(), {
2389 initial: Some(true),
2390 timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2391 }),
2392 ),
2393 (
2394 complete_overlap.to_owned(),
2395 assign!(http::response::Room::default(), {
2396 initial: Some(true),
2397 timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2398 }),
2399 ),
2400 (
2401 no_remote_events.to_owned(),
2402 assign!(http::response::Room::default(), {
2403 initial: Some(true),
2404 timeline: vec![],
2405 }),
2406 ),
2407 (
2408 no_local_events.to_owned(),
2409 assign!(http::response::Room::default(), {
2410 initial: Some(true),
2411 timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2412 }),
2413 ),
2414 (
2415 already_limited.to_owned(),
2416 assign!(http::response::Room::default(), {
2417 initial: Some(true),
2418 limited: true,
2419 timeline: vec![event_c.into_raw(), event_d.into_raw()],
2420 }),
2421 ),
2422 ]);
2423
2424 compute_limited(&local_rooms, &mut remote_rooms);
2425
2426 assert!(!remote_rooms.get(not_initial).unwrap().limited);
2427 assert!(remote_rooms.get(no_overlap).unwrap().limited);
2428 assert!(!remote_rooms.get(partial_overlap).unwrap().limited);
2429 assert!(!remote_rooms.get(complete_overlap).unwrap().limited);
2430 assert!(!remote_rooms.get(no_remote_events).unwrap().limited);
2431 assert!(!remote_rooms.get(no_local_events).unwrap().limited);
2432 assert!(remote_rooms.get(already_limited).unwrap().limited);
2433 }
2434
2435 #[async_test]
2436 async fn test_process_read_receipts() -> Result<()> {
2437 let room = owned_room_id!("!pony:example.org");
2438
2439 let server = MockServer::start().await;
2440 let client = logged_in_client(Some(server.uri())).await;
2441
2442 let sliding_sync = client
2443 .sliding_sync("test")?
2444 .with_receipt_extension(
2445 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2446 )
2447 .add_list(
2448 SlidingSyncList::builder("all")
2449 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2450 )
2451 .build()
2452 .await?;
2453
2454 {
2456 let server_response = assign!(http::Response::new("0".to_owned()), {
2457 rooms: BTreeMap::from([(
2458 room.clone(),
2459 http::response::Room::default(),
2460 )])
2461 });
2462
2463 let _summary = {
2464 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2465 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2466 };
2467 }
2468
2469 let server_response = assign!(http::Response::new("1".to_owned()), {
2470 extensions: assign!(http::response::Extensions::default(), {
2471 receipts: assign!(http::response::Receipts::default(), {
2472 rooms: BTreeMap::from([
2473 (
2474 room.clone(),
2475 Raw::from_json_string(
2476 json!({
2477 "room_id": room,
2478 "type": "m.receipt",
2479 "content": {
2480 "$event:bar.org": {
2481 "m.read": {
2482 client.user_id().unwrap(): {
2483 "ts": 1436451550,
2484 }
2485 }
2486 }
2487 }
2488 })
2489 .to_string(),
2490 ).unwrap()
2491 )
2492 ])
2493 })
2494 })
2495 });
2496
2497 let summary = {
2498 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2499 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2500 };
2501
2502 assert!(summary.rooms.contains(&room));
2503
2504 Ok(())
2505 }
2506
2507 #[async_test]
2508 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2509 let room_id = owned_room_id!("!unicorn:example.org");
2510
2511 let server = MockServer::start().await;
2512 let client = logged_in_client(Some(server.uri())).await;
2513
2514 let sliding_sync = client
2517 .sliding_sync("test")?
2518 .with_account_data_extension(
2519 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2520 )
2521 .add_list(
2522 SlidingSyncList::builder("all")
2523 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2524 )
2525 .build()
2526 .await?;
2527
2528 {
2530 let server_response = assign!(http::Response::new("0".to_owned()), {
2531 rooms: BTreeMap::from([(
2532 room_id.clone(),
2533 http::response::Room::default(),
2534 )])
2535 });
2536
2537 let _summary = {
2538 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2539 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2540 };
2541 }
2542
2543 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2547
2548 let update_summary = {
2549 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2550 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2551 };
2552
2553 assert!(update_summary.rooms.contains(&room_id));
2556
2557 let room = client.get_room(&room_id).unwrap();
2558
2559 assert!(room.is_marked_unread());
2562
2563 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2566
2567 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2568 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
2569
2570 let room = client.get_room(&room_id).unwrap();
2571
2572 assert!(!room.is_marked_unread());
2573
2574 Ok(())
2575 }
2576
2577 fn make_mark_unread_response(
2578 response_number: &str,
2579 room_id: OwnedRoomId,
2580 unread: bool,
2581 add_rooms_section: bool,
2582 ) -> http::Response {
2583 let rooms = if add_rooms_section {
2584 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2585 } else {
2586 BTreeMap::new()
2587 };
2588
2589 let extensions = assign!(http::response::Extensions::default(), {
2590 account_data: assign!(http::response::AccountData::default(), {
2591 rooms: BTreeMap::from([
2592 (
2593 room_id,
2594 vec![
2595 Raw::from_json_string(
2596 json!({
2597 "content": {
2598 "unread": unread
2599 },
2600 "type": "com.famedly.marked_unread"
2601 })
2602 .to_string(),
2603 ).unwrap()
2604 ]
2605 )
2606 ])
2607 })
2608 });
2609
2610 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2611 }
2612
2613 #[async_test]
2614 async fn test_process_rooms_account_data() -> Result<()> {
2615 let room = owned_room_id!("!pony:example.org");
2616
2617 let server = MockServer::start().await;
2618 let client = logged_in_client(Some(server.uri())).await;
2619
2620 let sliding_sync = client
2621 .sliding_sync("test")?
2622 .with_account_data_extension(
2623 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2624 )
2625 .add_list(
2626 SlidingSyncList::builder("all")
2627 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2628 )
2629 .build()
2630 .await?;
2631
2632 {
2634 let server_response = assign!(http::Response::new("0".to_owned()), {
2635 rooms: BTreeMap::from([(
2636 room.clone(),
2637 http::response::Room::default(),
2638 )])
2639 });
2640
2641 let _summary = {
2642 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2643 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2644 };
2645 }
2646
2647 let server_response = assign!(http::Response::new("1".to_owned()), {
2648 extensions: assign!(http::response::Extensions::default(), {
2649 account_data: assign!(http::response::AccountData::default(), {
2650 rooms: BTreeMap::from([
2651 (
2652 room.clone(),
2653 vec![
2654 Raw::from_json_string(
2655 json!({
2656 "content": {
2657 "tags": {
2658 "u.work": {
2659 "order": 0.9
2660 }
2661 }
2662 },
2663 "type": "m.tag"
2664 })
2665 .to_string(),
2666 ).unwrap()
2667 ]
2668 )
2669 ])
2670 })
2671 })
2672 });
2673 let summary = {
2674 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2675 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2676 };
2677
2678 assert!(summary.rooms.contains(&room));
2679
2680 Ok(())
2681 }
2682
2683 #[async_test]
2684 #[cfg(feature = "e2e-encryption")]
2685 async fn test_process_only_encryption_events() -> Result<()> {
2686 use ruma::OneTimeKeyAlgorithm;
2687
2688 let room = owned_room_id!("!croissant:example.org");
2689
2690 let server = MockServer::start().await;
2691 let client = logged_in_client(Some(server.uri())).await;
2692
2693 let server_response = assign!(http::Response::new("0".to_owned()), {
2694 rooms: BTreeMap::from([(
2695 room.clone(),
2696 assign!(http::response::Room::default(), {
2697 name: Some("Croissants lovers".to_owned()),
2698 timeline: Vec::new(),
2699 }),
2700 )]),
2701
2702 extensions: assign!(http::response::Extensions::default(), {
2703 e2ee: assign!(http::response::E2EE::default(), {
2704 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2705 }),
2706 to_device: Some(assign!(http::response::ToDevice::default(), {
2707 next_batch: "to-device-token".to_owned(),
2708 })),
2709 })
2710 });
2711
2712 let sliding_sync = client
2716 .sliding_sync("test")?
2717 .with_to_device_extension(
2718 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2719 )
2720 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2721 .build()
2722 .await?;
2723
2724 {
2725 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2726
2727 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2728 }
2729
2730 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2732 assert_eq!(uploaded_key_count, 42);
2733
2734 {
2735 let olm_machine = &*client.olm_machine_for_testing().await;
2736 assert_eq!(
2737 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2738 Some("to-device-token")
2739 );
2740 }
2741
2742 assert!(client.get_room(&room).is_none());
2744
2745 let client = logged_in_client(Some(server.uri())).await;
2748
2749 let sliding_sync = client
2750 .sliding_sync("test")?
2751 .add_list(SlidingSyncList::builder("thelist"))
2752 .build()
2753 .await?;
2754
2755 {
2756 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2757
2758 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2759 }
2760
2761 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2763 assert_eq!(uploaded_key_count, 0);
2764
2765 {
2766 let olm_machine = &*client.olm_machine_for_testing().await;
2767 assert_eq!(
2768 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2769 None
2770 );
2771 }
2772
2773 assert!(client.get_room(&room).is_some());
2775
2776 let client = logged_in_client(Some(server.uri())).await;
2778
2779 let sliding_sync = client
2780 .sliding_sync("test")?
2781 .add_list(SlidingSyncList::builder("thelist"))
2782 .with_to_device_extension(
2783 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2784 )
2785 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2786 .build()
2787 .await?;
2788
2789 {
2790 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2791
2792 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2793 }
2794
2795 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2797 assert_eq!(uploaded_key_count, 42);
2798
2799 {
2800 let olm_machine = &*client.olm_machine_for_testing().await;
2801 assert_eq!(
2802 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2803 Some("to-device-token")
2804 );
2805 }
2806
2807 assert!(client.get_room(&room).is_some());
2809
2810 Ok(())
2811 }
2812
2813 #[async_test]
2814 async fn test_lock_multiple_requests() -> Result<()> {
2815 let server = MockServer::start().await;
2816 let client = logged_in_client(Some(server.uri())).await;
2817
2818 let pos = Arc::new(Mutex::new(0));
2819 let _mock_guard = Mock::given(SlidingSyncMatcher)
2820 .respond_with(move |_: &Request| {
2821 let mut pos = pos.lock().unwrap();
2822 *pos += 1;
2823 ResponseTemplate::new(200).set_body_json(json!({
2824 "pos": pos.to_string(),
2825 "lists": {},
2826 "rooms": {}
2827 }))
2828 })
2829 .mount_as_scoped(&server)
2830 .await;
2831
2832 let sliding_sync = client
2833 .sliding_sync("test")?
2834 .with_to_device_extension(
2835 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2836 )
2837 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2838 .build()
2839 .await?;
2840
2841 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2844
2845 for result in requests.await {
2846 result?;
2847 }
2848
2849 Ok(())
2850 }
2851
2852 #[async_test]
2853 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2854 let server = MockServer::start().await;
2855 let client = logged_in_client(Some(server.uri())).await;
2856
2857 let pos = Arc::new(Mutex::new(0));
2858 let _mock_guard = Mock::given(SlidingSyncMatcher)
2859 .respond_with(move |_: &Request| {
2860 let mut pos = pos.lock().unwrap();
2861 *pos += 1;
2862 ResponseTemplate::new(200)
2864 .set_body_json(json!({
2865 "pos": pos.to_string(),
2866 "lists": {},
2867 "rooms": {}
2868 }))
2869 .set_delay(Duration::from_secs(2))
2870 })
2871 .mount_as_scoped(&server)
2872 .await;
2873
2874 let sliding_sync =
2875 client
2876 .sliding_sync("test")?
2877 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2878 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2879 ))
2880 .add_list(
2881 SlidingSyncList::builder("another-list")
2882 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2883 )
2884 .build()
2885 .await?;
2886
2887 let stream = sliding_sync.sync();
2888 pin_mut!(stream);
2889
2890 let cloned_sync = sliding_sync.clone();
2891 tokio::spawn(async move {
2892 tokio::time::sleep(Duration::from_millis(100)).await;
2893
2894 cloned_sync
2895 .on_list("another-list", |list| {
2896 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2897 ready(())
2898 })
2899 .await;
2900 });
2901
2902 assert_matches!(stream.next().await, Some(Ok(_)));
2903
2904 sliding_sync.stop_sync().unwrap();
2905
2906 assert_matches!(stream.next().await, None);
2907
2908 let mut num_requests = 0;
2909
2910 for request in server.received_requests().await.unwrap() {
2911 if !SlidingSyncMatcher.matches(&request) {
2912 continue;
2913 }
2914
2915 let another_list_ranges = if num_requests == 0 {
2916 json!([[0, 10]])
2918 } else {
2919 json!([[10, 20]])
2921 };
2922
2923 num_requests += 1;
2924 assert!(num_requests <= 2, "more than one request hit the server");
2925
2926 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2927
2928 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2929 &json_value,
2930 &json!({
2931 "conn_id": "test",
2932 "lists": {
2933 "room-list": {
2934 "ranges": [[0, 9]],
2935 "required_state": [
2936 ["m.room.encryption", ""],
2937 ["m.room.tombstone", ""]
2938 ],
2939 },
2940 "another-list": {
2941 "ranges": another_list_ranges,
2942 "required_state": [
2943 ["m.room.encryption", ""],
2944 ["m.room.tombstone", ""]
2945 ],
2946 },
2947 }
2948 }),
2949 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2950 ) {
2951 dbg!(json_value);
2952 panic!("json differ: {err}");
2953 }
2954 }
2955
2956 assert_eq!(num_requests, 2);
2957
2958 Ok(())
2959 }
2960
2961 #[async_test]
2962 async fn test_timeout_zero_list() -> Result<()> {
2963 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2964
2965 let (request, _, _) =
2966 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2967
2968 assert!(request.timeout.is_some());
2971
2972 Ok(())
2973 }
2974
2975 #[async_test]
2976 async fn test_timeout_one_list() -> Result<()> {
2977 let (_server, sliding_sync) = new_sliding_sync(vec![
2978 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2979 ])
2980 .await?;
2981
2982 let (request, _, _) =
2983 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2984
2985 assert!(request.timeout.is_none());
2987
2988 {
2990 let server_response = assign!(http::Response::new("0".to_owned()), {
2991 lists: BTreeMap::from([(
2992 "foo".to_owned(),
2993 assign!(http::response::List::default(), {
2994 count: uint!(7),
2995 })
2996 )])
2997 });
2998
2999 let _summary = {
3000 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3001 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3002 };
3003 }
3004
3005 let (request, _, _) =
3006 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3007
3008 assert!(request.timeout.is_some());
3010
3011 Ok(())
3012 }
3013
3014 #[async_test]
3015 async fn test_timeout_three_lists() -> Result<()> {
3016 let (_server, sliding_sync) = new_sliding_sync(vec![
3017 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
3018 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
3019 SlidingSyncList::builder("baz")
3020 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
3021 ])
3022 .await?;
3023
3024 let (request, _, _) =
3025 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3026
3027 assert!(request.timeout.is_none());
3029
3030 {
3032 let server_response = assign!(http::Response::new("0".to_owned()), {
3033 lists: BTreeMap::from([(
3034 "foo".to_owned(),
3035 assign!(http::response::List::default(), {
3036 count: uint!(7),
3037 })
3038 )])
3039 });
3040
3041 let _summary = {
3042 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3043 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3044 };
3045 }
3046
3047 let (request, _, _) =
3048 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3049
3050 assert!(request.timeout.is_none());
3052
3053 {
3055 let server_response = assign!(http::Response::new("1".to_owned()), {
3056 lists: BTreeMap::from([(
3057 "bar".to_owned(),
3058 assign!(http::response::List::default(), {
3059 count: uint!(7),
3060 })
3061 )])
3062 });
3063
3064 let _summary = {
3065 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3066 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3067 };
3068 }
3069
3070 let (request, _, _) =
3071 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3072
3073 assert!(request.timeout.is_some());
3075
3076 Ok(())
3077 }
3078
3079 #[async_test]
3080 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
3081 let server = MockServer::start().await;
3082 let client = logged_in_client(Some(server.uri())).await;
3083
3084 let _mock_guard = Mock::given(SlidingSyncMatcher)
3085 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3086 "pos": "0",
3087 "lists": {},
3088 "rooms": {}
3089 })))
3090 .mount_as_scoped(&server)
3091 .await;
3092
3093 let sliding_sync = client
3094 .sliding_sync("test")?
3095 .with_to_device_extension(
3096 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
3097 )
3098 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
3099 .build()
3100 .await?;
3101
3102 let sliding_sync = Arc::new(sliding_sync);
3103
3104 let sync_beat_listener = client.inner.sync_beat.listen();
3106 sliding_sync.sync_once().await?;
3107
3108 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
3110 Ok(())
3111 }
3112
3113 #[async_test]
3114 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
3115 let server = MockServer::start().await;
3116 let client = logged_in_client(Some(server.uri())).await;
3117
3118 let _mock_guard = Mock::given(SlidingSyncMatcher)
3119 .respond_with(ResponseTemplate::new(404))
3120 .mount_as_scoped(&server)
3121 .await;
3122
3123 let sliding_sync = client
3124 .sliding_sync("test")?
3125 .with_to_device_extension(
3126 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
3127 )
3128 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
3129 .build()
3130 .await?;
3131
3132 let sliding_sync = Arc::new(sliding_sync);
3133
3134 let sync_beat_listener = client.inner.sync_beat.listen();
3136 let sync_result = sliding_sync.sync_once().await;
3137 assert!(sync_result.is_err());
3138
3139 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
3141
3142 Ok(())
3143 }
3144}