matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28    collections::{btree_map::Entry, BTreeMap},
29    fmt::Debug,
30    future::Future,
31    sync::{Arc, RwLock as StdRwLock},
32    time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_base::RequestedRequiredStates;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46    select,
47    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55    cache::restore_sliding_sync_state,
56    client::SlidingSyncResponseProcessor,
57    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, Result};
60
61/// The Sliding Sync instance.
62///
63/// It is OK to clone this type as much as you need: cloning it is cheap.
64#[derive(Clone, Debug)]
65pub struct SlidingSync {
66    /// The Sliding Sync data.
67    inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72    /// A unique identifier for this instance of sliding sync.
73    ///
74    /// Used to distinguish different connections to sliding sync.
75    id: String,
76
77    /// The HTTP Matrix client.
78    client: Client,
79
80    /// Long-polling timeout that appears in sliding sync request.
81    poll_timeout: Duration,
82
83    /// Extra duration for the sliding sync request to timeout. This is added to
84    /// the [`Self::proxy_timeout`].
85    network_timeout: Duration,
86
87    /// The storage key to keep this cache at and load it from.
88    storage_key: String,
89
90    /// Should this sliding sync instance try to restore its sync position
91    /// from the database?
92    ///
93    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
94    /// keep it even so, to avoid sparkling cfg statements everywhere
95    /// throughout this file.
96    share_pos: bool,
97
98    /// Position markers.
99    ///
100    /// The `pos` marker represents a progression when exchanging requests and
101    /// responses with the server: the server acknowledges the request by
102    /// responding with a new `pos`. If the client sends two non-necessarily
103    /// consecutive requests with the same `pos`, the server has to reply with
104    /// the same identical response.
105    ///
106    /// `position` is behind a mutex so that a new request starts after the
107    /// previous request trip has fully ended (successfully or not). This
108    /// mechanism exists to wait for the response to be handled and to see the
109    /// `position` being updated, before sending a new request.
110    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
111
112    /// The lists of this Sliding Sync instance.
113    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
114
115    /// All the rooms synced with Sliding Sync.
116    rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
117
118    /// Request parameters that are sticky.
119    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
120
121    /// Internal channel used to pass messages between Sliding Sync and other
122    /// types.
123    internal_channel: Sender<SlidingSyncInternalMessage>,
124}
125
126impl SlidingSync {
127    pub(super) fn new(inner: SlidingSyncInner) -> Self {
128        Self { inner: Arc::new(inner) }
129    }
130
131    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
132        cache::store_sliding_sync_state(self, position).await
133    }
134
135    /// Create a new [`SlidingSyncBuilder`].
136    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
137        SlidingSyncBuilder::new(id, client)
138    }
139
140    /// Subscribe to many rooms.
141    ///
142    /// If the associated `Room`s exist, it will be marked as
143    /// members are missing, so that it ensures to re-fetch all members.
144    ///
145    /// A subscription to an already subscribed room is ignored.
146    pub fn subscribe_to_rooms(
147        &self,
148        room_ids: &[&RoomId],
149        settings: Option<http::request::RoomSubscription>,
150        cancel_in_flight_request: bool,
151    ) {
152        let settings = settings.unwrap_or_default();
153        let mut sticky = self.inner.sticky.write().unwrap();
154        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
155
156        let mut skip_over_current_sync_loop_iteration = false;
157
158        for room_id in room_ids {
159            // If the room subscription already exists, let's not
160            // override it with a new one. First, it would reset its
161            // state (`RoomSubscriptionState`), and second it would try to
162            // re-subscribe with the next request. We don't want that. A room
163            // subscription should happen once, and next subscriptions should
164            // be ignored.
165            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
166                if let Some(room) = self.inner.client.get_room(room_id) {
167                    room.mark_members_missing();
168                }
169
170                entry.insert((RoomSubscriptionState::default(), settings.clone()));
171
172                skip_over_current_sync_loop_iteration = true;
173            }
174        }
175
176        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
177            self.inner.internal_channel_send_if_possible(
178                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
179            );
180        }
181    }
182
183    /// Lookup a specific room
184    pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
185        self.inner.rooms.read().await.get(room_id).cloned()
186    }
187
188    /// Check the number of rooms.
189    pub fn get_number_of_rooms(&self) -> usize {
190        self.inner.rooms.blocking_read().len()
191    }
192
193    /// Find a list by its name, and do something on it if it exists.
194    pub async fn on_list<Function, FunctionOutput, R>(
195        &self,
196        list_name: &str,
197        function: Function,
198    ) -> Option<R>
199    where
200        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
201        FunctionOutput: Future<Output = R>,
202    {
203        let lists = self.inner.lists.read().await;
204
205        match lists.get(list_name) {
206            Some(list) => Some(function(list).await),
207            None => None,
208        }
209    }
210
211    /// Add the list to the list of lists.
212    ///
213    /// As lists need to have a unique `.name`, if a list with the same name
214    /// is found the new list will replace the old one and the return it or
215    /// `None`.
216    pub async fn add_list(
217        &self,
218        list_builder: SlidingSyncListBuilder,
219    ) -> Result<Option<SlidingSyncList>> {
220        let list = list_builder.build(self.inner.internal_channel.clone());
221
222        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
223
224        self.inner.internal_channel_send_if_possible(
225            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
226        );
227
228        Ok(old_list)
229    }
230
231    /// Add a list that will be cached and reloaded from the cache.
232    ///
233    /// This will raise an error if a storage key was not set, or if there
234    /// was a I/O error reading from the cache.
235    ///
236    /// The rest of the semantics is the same as [`Self::add_list`].
237    pub async fn add_cached_list(
238        &self,
239        mut list_builder: SlidingSyncListBuilder,
240    ) -> Result<Option<SlidingSyncList>> {
241        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
242
243        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
244
245        self.add_list(list_builder).await
246    }
247
248    /// Lookup a set of rooms
249    pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
250        &self,
251        room_ids: I,
252    ) -> Vec<Option<SlidingSyncRoom>> {
253        let rooms = self.inner.rooms.read().await;
254
255        room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
256    }
257
258    /// Get all rooms.
259    pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
260        self.inner.rooms.read().await.values().cloned().collect()
261    }
262
263    /// Handle the HTTP response.
264    #[instrument(skip_all)]
265    async fn handle_response(
266        &self,
267        sliding_sync_response: http::Response,
268        position: &mut SlidingSyncPositionMarkers,
269        requested_required_states: RequestedRequiredStates,
270    ) -> Result<UpdateSummary, crate::Error> {
271        let pos = Some(sliding_sync_response.pos.clone());
272
273        let must_process_rooms_response = self.must_process_rooms_response().await;
274
275        trace!(yes = must_process_rooms_response, "Must process rooms response?");
276
277        // Transform a Sliding Sync Response to a `SyncResponse`.
278        //
279        // We may not need the `sync_response` in the future (once `SyncResponse` will
280        // move to Sliding Sync, i.e. to `http::Response`), but processing the
281        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
282        // happens here.
283
284        let mut sync_response = {
285            // Take the lock to avoid concurrent sliding syncs overwriting each other's room
286            // infos.
287            let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
288
289            let rooms = &*self.inner.rooms.read().await;
290            let mut response_processor =
291                SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
292
293            #[cfg(feature = "e2e-encryption")]
294            if self.is_e2ee_enabled() {
295                response_processor.handle_encryption(&sliding_sync_response.extensions).await?
296            }
297
298            // Only handle the room's subsection of the response, if this sliding sync was
299            // configured to do so.
300            if must_process_rooms_response {
301                response_processor
302                    .handle_room_response(&sliding_sync_response, &requested_required_states)
303                    .await?;
304            }
305
306            response_processor.process_and_take_response().await?
307        };
308
309        debug!(?sync_response, "Sliding Sync response has been handled by the client");
310
311        // Commit sticky parameters, if needed.
312        if let Some(ref txn_id) = sliding_sync_response.txn_id {
313            let txn_id = txn_id.as_str().into();
314            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
315            let mut lists = self.inner.lists.write().await;
316            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
317        }
318
319        let update_summary = {
320            // Update the rooms.
321            let updated_rooms = {
322                let mut rooms_map = self.inner.rooms.write().await;
323
324                let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
325
326                for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
327                    // `sync_response` contains the rooms with decrypted events if any, so look at
328                    // the timeline events here first if the room exists.
329                    // Otherwise, let's look at the timeline inside the `sliding_sync_response`.
330                    let timeline =
331                        if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
332                            joined_room.timeline.events
333                        } else {
334                            room_data.timeline.drain(..).map(TimelineEvent::new).collect()
335                        };
336
337                    match rooms_map.get_mut(&room_id) {
338                        // The room existed before, let's update it.
339                        Some(room) => {
340                            room.update(room_data, timeline);
341                        }
342
343                        // First time we need this room, let's create it.
344                        None => {
345                            rooms_map.insert(
346                                room_id.clone(),
347                                SlidingSyncRoom::new(
348                                    room_id.clone(),
349                                    room_data.prev_batch,
350                                    timeline,
351                                ),
352                            );
353                        }
354                    }
355
356                    updated_rooms.push(room_id);
357                }
358
359                // There might be other rooms that were only mentioned in the sliding sync
360                // extensions part of the response, and thus would result in rooms present in
361                // the `sync_response.join`. Mark them as updated too.
362                //
363                // Since we've removed rooms that were in the room subsection from
364                // `sync_response.rooms.join`, the remaining ones aren't already present in
365                // `updated_rooms` and wouldn't cause any duplicates.
366                updated_rooms.extend(sync_response.rooms.join.keys().cloned());
367
368                updated_rooms
369            };
370
371            // Update the lists.
372            let updated_lists = {
373                debug!(
374                    lists = ?sliding_sync_response.lists,
375                    "Update lists"
376                );
377
378                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
379                let mut lists = self.inner.lists.write().await;
380
381                // Iterate on known lists, not on lists in the response. Rooms may have been
382                // updated that were not involved in any list update.
383                for (name, list) in lists.iter_mut() {
384                    if let Some(updates) = sliding_sync_response.lists.get(name) {
385                        let maximum_number_of_rooms: u32 =
386                            updates.count.try_into().expect("failed to convert `count` to `u32`");
387
388                        if list.update(Some(maximum_number_of_rooms))? {
389                            updated_lists.push(name.clone());
390                        }
391                    } else if list.update(None)? {
392                        updated_lists.push(name.clone());
393                    }
394                }
395
396                // Report about unknown lists.
397                for name in sliding_sync_response.lists.keys() {
398                    if !lists.contains_key(name) {
399                        error!("Response for list `{name}` - unknown to us; skipping");
400                    }
401                }
402
403                updated_lists
404            };
405
406            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
407        };
408
409        // Everything went well, we can update the position markers.
410        //
411        // Save the new position markers.
412        position.pos = pos;
413
414        Ok(update_summary)
415    }
416
417    async fn generate_sync_request(
418        &self,
419        txn_id: &mut LazyTransactionId,
420    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
421        // Collect requests for lists.
422        let mut requests_lists = BTreeMap::new();
423
424        let require_timeout = {
425            let lists = self.inner.lists.read().await;
426
427            // Start at `true` in case there is zero list.
428            let mut require_timeout = true;
429
430            for (name, list) in lists.iter() {
431                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
432                require_timeout = require_timeout && list.requires_timeout();
433            }
434
435            require_timeout
436        };
437
438        // Collect the `pos`.
439        //
440        // Wait on the `position` mutex to be available. It means no request nor
441        // response is running. The `position` mutex is released whether the response
442        // has been fully handled successfully, in this case the `pos` is updated, or
443        // the response handling has failed, in this case the `pos` hasn't been updated
444        // and the same `pos` will be used for this new request.
445        let mut position_guard = self.inner.position.clone().lock_owned().await;
446
447        let to_device_enabled =
448            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
449
450        let restored_fields = if self.inner.share_pos || to_device_enabled {
451            let lists = self.inner.lists.read().await;
452            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
453        } else {
454            None
455        };
456
457        // Update pos: either the one restored from the database, if any and the sliding
458        // sync was configured so, or read it from the memory cache.
459        let pos = if self.inner.share_pos {
460            if let Some(fields) = &restored_fields {
461                // Override the memory one with the database one, for consistency.
462                if fields.pos != position_guard.pos {
463                    info!(
464                        "Pos from previous request ('{:?}') was different from \
465                         pos in database ('{:?}').",
466                        position_guard.pos, fields.pos
467                    );
468                    position_guard.pos = fields.pos.clone();
469                }
470                fields.pos.clone()
471            } else {
472                position_guard.pos.clone()
473            }
474        } else {
475            position_guard.pos.clone()
476        };
477
478        Span::current().record("pos", &pos);
479
480        // When the client sends a request with no `pos`, MSC4186 returns no device
481        // lists updates, as it only returns changes since the provided `pos`
482        // (which is `null` in this case); this is in line with sync v2.
483        //
484        // Therefore, with MSC4186, the device list cache must be marked as to be
485        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
486        // device lists updates that happened between the previous request and the new
487        // “initial” request.
488        #[cfg(feature = "e2e-encryption")]
489        if pos.is_none() && self.is_e2ee_enabled() {
490            info!("Marking all tracked users as dirty");
491
492            let olm_machine = self.inner.client.olm_machine().await;
493            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
494            olm_machine.mark_all_tracked_users_as_dirty().await?;
495        }
496
497        // Configure the timeout.
498        //
499        // The `timeout` query is necessary when all lists require it. Please see
500        // [`SlidingSyncList::requires_timeout`].
501        let timeout = require_timeout.then(|| self.inner.poll_timeout);
502
503        let mut request = assign!(http::Request::new(), {
504            conn_id: Some(self.inner.id.clone()),
505            pos,
506            timeout,
507            lists: requests_lists,
508        });
509
510        // Apply sticky parameters, if needs be.
511        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
512
513        // Extensions are now applied (via sticky parameters).
514        //
515        // Override the to-device token if the extension is enabled.
516        if to_device_enabled {
517            request.extensions.to_device.since =
518                restored_fields.and_then(|fields| fields.to_device_token);
519        }
520
521        // Apply the transaction id if one was generated.
522        if let Some(txn_id) = txn_id.get() {
523            request.txn_id = Some(txn_id.to_string());
524        }
525
526        Ok((
527            // The request itself.
528            request,
529            // Configure long-polling. We need some time for the long-poll itself,
530            // and extra time for the network delays.
531            RequestConfig::default()
532                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
533                .retry_limit(3),
534            position_guard,
535        ))
536    }
537
538    /// Send a sliding sync request.
539    ///
540    /// This method contains the sending logic.
541    async fn send_sync_request(
542        &self,
543        request: http::Request,
544        request_config: RequestConfig,
545        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
546    ) -> Result<UpdateSummary> {
547        debug!("Sending request");
548
549        // Prepare the request.
550        let requested_required_states = RequestedRequiredStates::from(&request);
551        let request = self.inner.client.send(request).with_request_config(request_config);
552
553        // Send the request and get a response with end-to-end encryption support.
554        //
555        // Sending the `/sync` request out when end-to-end encryption is enabled means
556        // that we need to also send out any outgoing e2ee related request out
557        // coming from the `OlmMachine::outgoing_requests()` method.
558
559        #[cfg(feature = "e2e-encryption")]
560        let response = {
561            if self.is_e2ee_enabled() {
562                // Here, we need to run 2 things:
563                //
564                // 1. Send the sliding sync request and get a response,
565                // 2. Send the E2EE requests.
566                //
567                // We don't want to use a `join` or `try_join` because we want to fail if and
568                // only if sending the sliding sync request fails. Failing to send the E2EE
569                // requests should just result in a log.
570                //
571                // We also want to give the priority to sliding sync request. E2EE requests are
572                // sent concurrently to the sliding sync request, but the priority is on waiting
573                // a sliding sync response.
574                //
575                // If sending sliding sync request fails, the sending of E2EE requests must be
576                // aborted as soon as possible.
577
578                let client = self.inner.client.clone();
579                let e2ee_uploads = spawn(
580                    async move {
581                        if let Err(error) = client.send_outgoing_requests().await {
582                            error!(?error, "Error while sending outgoing E2EE requests");
583                        }
584                    }
585                    .instrument(Span::current()),
586                )
587                // Ensure that the task is not running in detached mode. It is aborted when it's
588                // dropped.
589                .abort_on_drop();
590
591                // Wait on the sliding sync request success or failure early.
592                let response = request.await?;
593
594                // At this point, if `request` has been resolved successfully, we wait on
595                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
596                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
597                // been dropped, so aborted.
598                e2ee_uploads.await.map_err(|error| Error::JoinError {
599                    task_description: "e2ee_uploads".to_owned(),
600                    error,
601                })?;
602
603                response
604            } else {
605                request.await?
606            }
607        };
608
609        // Send the request and get a response _without_ end-to-end encryption support.
610        #[cfg(not(feature = "e2e-encryption"))]
611        let response = request.await?;
612
613        debug!("Received response");
614
615        // At this point, the request has been sent, and a response has been received.
616        //
617        // We must ensure the handling of the response cannot be stopped/
618        // cancelled. It must be done entirely, otherwise we can have
619        // corrupted/incomplete states for Sliding Sync and other parts of
620        // the code.
621        //
622        // That's why we are running the handling of the response in a spawned
623        // future that cannot be cancelled by anything.
624        let this = self.clone();
625
626        // Spawn a new future to ensure that the code inside this future cannot be
627        // cancelled if this method is cancelled.
628        let future = async move {
629            debug!("Start handling response");
630
631            // In case the task running this future is detached, we must
632            // ensure responses are handled one at a time. At this point we still own
633            // `position_guard`, so we're fine.
634
635            // Handle the response.
636            let updates = this
637                .handle_response(response, &mut position_guard, requested_required_states)
638                .await?;
639
640            this.cache_to_storage(&position_guard).await?;
641
642            // Release the position guard lock.
643            // It means that other responses can be generated and then handled later.
644            drop(position_guard);
645
646            debug!("Done handling response");
647
648            Ok(updates)
649        };
650
651        spawn(future.instrument(Span::current())).await.unwrap()
652    }
653
654    /// Is the e2ee extension enabled for this sliding sync instance?
655    #[cfg(feature = "e2e-encryption")]
656    fn is_e2ee_enabled(&self) -> bool {
657        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
658    }
659
660    #[cfg(not(feature = "e2e-encryption"))]
661    fn is_e2ee_enabled(&self) -> bool {
662        false
663    }
664
665    /// Should we process the room's subpart of a response?
666    async fn must_process_rooms_response(&self) -> bool {
667        // We consider that we must, if there's any room subscription or there's any
668        // list.
669        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
670            || !self.inner.lists.read().await.is_empty()
671    }
672
673    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
674    async fn sync_once(&self) -> Result<UpdateSummary> {
675        let (request, request_config, position_guard) =
676            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
677
678        // Send the request, kaboom.
679        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
680
681        // Notify a new sync was received
682        self.inner.client.inner.sync_beat.notify(usize::MAX);
683
684        Ok(summaries)
685    }
686
687    /// Create a _new_ Sliding Sync sync loop.
688    ///
689    /// This method returns a `Stream`, which will send requests and will handle
690    /// responses automatically. Lists and rooms are updated automatically.
691    ///
692    /// This function returns `Ok(…)` if everything went well, otherwise it will
693    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
694    /// termination.
695    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
696    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
697    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
698        debug!("Starting sync stream");
699
700        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
701
702        stream! {
703            loop {
704                debug!("Sync stream is running");
705
706                select! {
707                    biased;
708
709                    internal_message = internal_channel_receiver.recv() => {
710                        use SlidingSyncInternalMessage::*;
711
712                        debug!(?internal_message, "Sync stream has received an internal message");
713
714                        match internal_message {
715                            Err(_) | Ok(SyncLoopStop) => {
716                                break;
717                            }
718
719                            Ok(SyncLoopSkipOverCurrentIteration) => {
720                                continue;
721                            }
722                        }
723                    }
724
725                    update_summary = self.sync_once() => {
726                        match update_summary {
727                            Ok(updates) => {
728                                yield Ok(updates);
729                            }
730
731                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
732                            Err(error) => {
733                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
734                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
735                                    self.expire_session().await;
736                                }
737
738                                yield Err(error);
739
740                                // Terminates the loop, and terminates the stream.
741                                break;
742                            }
743                        }
744                    }
745                }
746            }
747
748            debug!("Sync stream has exited.");
749        }
750    }
751
752    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
753    ///
754    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
755    /// enough to “stop” it, but depending of how this `Stream` is used, it
756    /// might not be obvious to drop it immediately (thinking of using this API
757    /// over FFI; the foreign-language might not be able to drop a value
758    /// immediately). Thus, calling this method will ensure that the sync loop
759    /// stops gracefully and as soon as it returns.
760    pub fn stop_sync(&self) -> Result<()> {
761        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
762    }
763
764    /// Expire the current Sliding Sync session on the client-side.
765    ///
766    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
767    /// sticky parameters.
768    ///
769    /// This should only be used when it's clear that this session was about to
770    /// expire anyways, and should be used only in very specific cases (e.g.
771    /// multiple sliding syncs being run in parallel, and one of them has
772    /// expired).
773    ///
774    /// This method **MUST** be called when the sync loop is stopped.
775    #[doc(hidden)]
776    pub async fn expire_session(&self) {
777        info!("Session expired; resetting `pos` and sticky parameters");
778
779        {
780            let mut position = self.inner.position.lock().await;
781            position.pos = None;
782
783            if let Err(err) = self.cache_to_storage(&position).await {
784                error!(
785                    "couldn't invalidate sliding sync frozen state when expiring session: {err}"
786                );
787            }
788        }
789
790        {
791            let mut sticky = self.inner.sticky.write().unwrap();
792
793            // Clear all room subscriptions: we don't want to resend all room subscriptions
794            // when the session will restart.
795            sticky.data_mut().room_subscriptions.clear();
796        }
797
798        self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
799    }
800}
801
802impl SlidingSyncInner {
803    /// Send a message over the internal channel.
804    #[instrument]
805    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
806        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
807    }
808
809    /// Send a message over the internal channel if there is a receiver, i.e. if
810    /// the sync loop is running.
811    #[instrument]
812    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
813        // If there is no receiver, the send will fail, but that's OK here.
814        let _ = self.internal_channel.send(message);
815    }
816}
817
818#[derive(Copy, Clone, Debug, PartialEq)]
819enum SlidingSyncInternalMessage {
820    /// Instruct the sync loop to stop.
821    SyncLoopStop,
822
823    /// Instruct the sync loop to skip over any remaining work in its iteration,
824    /// and to jump to the next iteration.
825    SyncLoopSkipOverCurrentIteration,
826}
827
828#[cfg(any(test, feature = "testing"))]
829impl SlidingSync {
830    /// Set a new value for `pos`.
831    pub async fn set_pos(&self, new_pos: String) {
832        let mut position_lock = self.inner.position.lock().await;
833        position_lock.pos = Some(new_pos);
834    }
835
836    /// Read the static extension configuration for this Sliding Sync.
837    ///
838    /// Note: this is not the next content of the sticky parameters, but rightly
839    /// the static configuration that was set during creation of this
840    /// Sliding Sync.
841    pub fn extensions_config(&self) -> http::request::Extensions {
842        let sticky = self.inner.sticky.read().unwrap();
843        sticky.data().extensions.clone()
844    }
845}
846
847#[derive(Clone, Debug)]
848pub(super) struct SlidingSyncPositionMarkers {
849    /// An ephemeral position in the current stream, as received from the
850    /// previous `/sync` response, or `None` for the first request.
851    ///
852    /// Should not be persisted.
853    pos: Option<String>,
854}
855
856/// Frozen bits of a Sliding Sync that are stored in the *state* store.
857#[derive(Debug, Serialize, Deserialize)]
858struct FrozenSlidingSync {
859    /// Deprecated: prefer storing in the crypto store.
860    #[serde(skip_serializing_if = "Option::is_none")]
861    to_device_since: Option<String>,
862    #[serde(default, skip_serializing_if = "Vec::is_empty")]
863    rooms: Vec<FrozenSlidingSyncRoom>,
864}
865
866impl FrozenSlidingSync {
867    fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
868        // The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
869        Self {
870            to_device_since: None,
871            rooms: rooms
872                .iter()
873                .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
874                .collect::<Vec<_>>(),
875        }
876    }
877}
878
879#[derive(Serialize, Deserialize)]
880struct FrozenSlidingSyncPos {
881    #[serde(skip_serializing_if = "Option::is_none")]
882    pos: Option<String>,
883}
884
885/// A summary of the updates received after a sync (like in
886/// [`SlidingSync::sync`]).
887#[derive(Debug, Clone)]
888pub struct UpdateSummary {
889    /// The names of the lists that have seen an update.
890    pub lists: Vec<String>,
891    /// The rooms that have seen updates
892    pub rooms: Vec<OwnedRoomId>,
893}
894
895/// A very basic bool-ish enum to represent the state of a
896/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
897/// once should ideally not being sent again, to mostly save bandwidth.
898#[derive(Debug, Default)]
899enum RoomSubscriptionState {
900    /// The `RoomSubscription` has not been sent or received correctly from the
901    /// server, i.e. the `RoomSubscription` —which is part of the sticky
902    /// parameters— has not been committed.
903    #[default]
904    Pending,
905
906    /// The `RoomSubscription` has been sent and received correctly by the
907    /// server.
908    Applied,
909}
910
911/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
912/// sent in the request.
913#[derive(Debug)]
914pub(super) struct SlidingSyncStickyParameters {
915    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
916    /// but one wants to receive updates.
917    room_subscriptions:
918        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
919
920    /// The intended state of the extensions being supplied to sliding /sync
921    /// calls.
922    extensions: http::request::Extensions,
923}
924
925impl SlidingSyncStickyParameters {
926    /// Create a new set of sticky parameters.
927    pub fn new(
928        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
929        extensions: http::request::Extensions,
930    ) -> Self {
931        Self {
932            room_subscriptions: room_subscriptions
933                .into_iter()
934                .map(|(room_id, room_subscription)| {
935                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
936                })
937                .collect(),
938            extensions,
939        }
940    }
941}
942
943impl StickyData for SlidingSyncStickyParameters {
944    type Request = http::Request;
945
946    fn apply(&self, request: &mut Self::Request) {
947        request.room_subscriptions = self
948            .room_subscriptions
949            .iter()
950            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
951            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
952            .collect();
953        request.extensions = self.extensions.clone();
954    }
955
956    fn on_commit(&mut self) {
957        // All room subscriptions are marked as `Applied`.
958        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
959            if matches!(state, RoomSubscriptionState::Pending) {
960                *state = RoomSubscriptionState::Applied;
961            }
962        }
963    }
964}
965
966#[cfg(all(test, not(target_family = "wasm")))]
967#[allow(clippy::dbg_macro)]
968mod tests {
969    use std::{
970        collections::BTreeMap,
971        future::ready,
972        ops::Not,
973        sync::{Arc, Mutex},
974        time::Duration,
975    };
976
977    use assert_matches::assert_matches;
978    use event_listener::Listener;
979    use futures_util::{future::join_all, pin_mut, StreamExt};
980    use matrix_sdk_base::RequestedRequiredStates;
981    use matrix_sdk_test::async_test;
982    use ruma::{
983        api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
984        OwnedRoomId, TransactionId,
985    };
986    use serde::Deserialize;
987    use serde_json::json;
988    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
989
990    use super::{
991        http,
992        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
993        FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
994        SlidingSyncStickyParameters,
995    };
996    use crate::{
997        sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
998    };
999
1000    #[derive(Copy, Clone)]
1001    struct SlidingSyncMatcher;
1002
1003    impl Match for SlidingSyncMatcher {
1004        fn matches(&self, request: &Request) -> bool {
1005            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1006                && request.method == Method::POST
1007        }
1008    }
1009
1010    async fn new_sliding_sync(
1011        lists: Vec<SlidingSyncListBuilder>,
1012    ) -> Result<(MockServer, SlidingSync)> {
1013        let server = MockServer::start().await;
1014        let client = logged_in_client(Some(server.uri())).await;
1015
1016        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1017
1018        for list in lists {
1019            sliding_sync_builder = sliding_sync_builder.add_list(list);
1020        }
1021
1022        let sliding_sync = sliding_sync_builder.build().await?;
1023
1024        Ok((server, sliding_sync))
1025    }
1026
1027    #[async_test]
1028    async fn test_subscribe_to_rooms() -> Result<()> {
1029        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1030            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1031        .await?;
1032
1033        let stream = sliding_sync.sync();
1034        pin_mut!(stream);
1035
1036        let room_id_0 = room_id!("!r0:bar.org");
1037        let room_id_1 = room_id!("!r1:bar.org");
1038        let room_id_2 = room_id!("!r2:bar.org");
1039
1040        {
1041            let _mock_guard = Mock::given(SlidingSyncMatcher)
1042                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1043                    "pos": "1",
1044                    "lists": {},
1045                    "rooms": {
1046                        room_id_0: {
1047                            "name": "Room #0",
1048                            "initial": true,
1049                        },
1050                        room_id_1: {
1051                            "name": "Room #1",
1052                            "initial": true,
1053                        },
1054                        room_id_2: {
1055                            "name": "Room #2",
1056                            "initial": true,
1057                        },
1058                    }
1059                })))
1060                .mount_as_scoped(&server)
1061                .await;
1062
1063            let _ = stream.next().await.unwrap()?;
1064        }
1065
1066        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1067
1068        // Members aren't synced.
1069        // We need to make them synced, so that we can test that subscribing to a room
1070        // make members not synced. That's a desired feature.
1071        assert!(room0.are_members_synced().not());
1072
1073        {
1074            struct MemberMatcher(OwnedRoomId);
1075
1076            impl Match for MemberMatcher {
1077                fn matches(&self, request: &Request) -> bool {
1078                    request.url.path()
1079                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1080                        && request.method == Method::GET
1081                }
1082            }
1083
1084            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1085                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1086                    "chunk": [],
1087                })))
1088                .mount_as_scoped(&server)
1089                .await;
1090
1091            assert_matches!(room0.request_members().await, Ok(()));
1092        }
1093
1094        // Members are now synced! We can start subscribing and see how it goes.
1095        assert!(room0.are_members_synced());
1096
1097        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1098
1099        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1100        // now marked as not synced.
1101        assert!(room0.are_members_synced().not());
1102
1103        {
1104            let sticky = sliding_sync.inner.sticky.read().unwrap();
1105            let room_subscriptions = &sticky.data().room_subscriptions;
1106
1107            assert!(room_subscriptions.contains_key(room_id_0));
1108            assert!(room_subscriptions.contains_key(room_id_1));
1109            assert!(!room_subscriptions.contains_key(room_id_2));
1110        }
1111
1112        // Subscribing to the same room doesn't reset the member sync state.
1113
1114        {
1115            struct MemberMatcher(OwnedRoomId);
1116
1117            impl Match for MemberMatcher {
1118                fn matches(&self, request: &Request) -> bool {
1119                    request.url.path()
1120                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1121                        && request.method == Method::GET
1122                }
1123            }
1124
1125            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1126                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1127                    "chunk": [],
1128                })))
1129                .mount_as_scoped(&server)
1130                .await;
1131
1132            assert_matches!(room0.request_members().await, Ok(()));
1133        }
1134
1135        // Members are synced, good, good.
1136        assert!(room0.are_members_synced());
1137
1138        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1139
1140        // Members are still synced: because we have already subscribed to the
1141        // room, the members aren't marked as unsynced.
1142        assert!(room0.are_members_synced());
1143
1144        Ok(())
1145    }
1146
1147    #[async_test]
1148    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1149        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1150            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1151        .await?;
1152
1153        let room_id_0 = room_id!("!r0:bar.org");
1154        let room_id_1 = room_id!("!r1:bar.org");
1155        let room_id_2 = room_id!("!r2:bar.org");
1156
1157        // Subscribe to two rooms.
1158        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1159
1160        {
1161            let sticky = sliding_sync.inner.sticky.read().unwrap();
1162            let room_subscriptions = &sticky.data().room_subscriptions;
1163
1164            assert!(room_subscriptions.contains_key(room_id_0));
1165            assert!(room_subscriptions.contains_key(room_id_1));
1166            assert!(room_subscriptions.contains_key(room_id_2).not());
1167        }
1168
1169        // Subscribe to one more room.
1170        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1171
1172        {
1173            let sticky = sliding_sync.inner.sticky.read().unwrap();
1174            let room_subscriptions = &sticky.data().room_subscriptions;
1175
1176            assert!(room_subscriptions.contains_key(room_id_0));
1177            assert!(room_subscriptions.contains_key(room_id_1));
1178            assert!(room_subscriptions.contains_key(room_id_2));
1179        }
1180
1181        // Suddenly, the session expires!
1182        sliding_sync.expire_session().await;
1183
1184        {
1185            let sticky = sliding_sync.inner.sticky.read().unwrap();
1186            let room_subscriptions = &sticky.data().room_subscriptions;
1187
1188            assert!(room_subscriptions.is_empty());
1189        }
1190
1191        // Subscribe to one room again.
1192        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1193
1194        {
1195            let sticky = sliding_sync.inner.sticky.read().unwrap();
1196            let room_subscriptions = &sticky.data().room_subscriptions;
1197
1198            assert!(room_subscriptions.contains_key(room_id_0).not());
1199            assert!(room_subscriptions.contains_key(room_id_1).not());
1200            assert!(room_subscriptions.contains_key(room_id_2));
1201        }
1202
1203        Ok(())
1204    }
1205
1206    #[async_test]
1207    async fn test_to_device_token_properly_cached() -> Result<()> {
1208        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1209            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1210        .await?;
1211
1212        // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
1213        // in the crypto store since PR #2323.
1214        let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1215        assert!(frozen.to_device_since.is_none());
1216
1217        Ok(())
1218    }
1219
1220    #[async_test]
1221    async fn test_add_list() -> Result<()> {
1222        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1223            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1224        .await?;
1225
1226        let _stream = sliding_sync.sync();
1227        pin_mut!(_stream);
1228
1229        sliding_sync
1230            .add_list(
1231                SlidingSyncList::builder("bar")
1232                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1233            )
1234            .await?;
1235
1236        let lists = sliding_sync.inner.lists.read().await;
1237
1238        assert!(lists.contains_key("foo"));
1239        assert!(lists.contains_key("bar"));
1240
1241        // this test also ensures that Tokio is not panicking when calling `add_list`.
1242
1243        Ok(())
1244    }
1245
1246    #[test]
1247    fn test_sticky_parameters_api_invalidated_flow() {
1248        let r0 = room_id!("!r0.matrix.org");
1249        let r1 = room_id!("!r1:matrix.org");
1250
1251        let mut room_subscriptions = BTreeMap::new();
1252        room_subscriptions.insert(r0.to_owned(), Default::default());
1253
1254        // At first it's invalidated.
1255        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1256            room_subscriptions,
1257            Default::default(),
1258        ));
1259        assert!(sticky.is_invalidated());
1260
1261        // Then when we create a request, the sticky parameters are applied.
1262        let txn_id: &TransactionId = "tid123".into();
1263
1264        let mut request = http::Request::default();
1265        request.txn_id = Some(txn_id.to_string());
1266
1267        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1268
1269        assert!(request.txn_id.is_some());
1270        assert_eq!(request.room_subscriptions.len(), 1);
1271        assert!(request.room_subscriptions.contains_key(r0));
1272
1273        let tid = request.txn_id.unwrap();
1274
1275        sticky.maybe_commit(tid.as_str().into());
1276        assert!(!sticky.is_invalidated());
1277
1278        // Applying new parameters will invalidate again.
1279        sticky
1280            .data_mut()
1281            .room_subscriptions
1282            .insert(r1.to_owned(), (Default::default(), Default::default()));
1283        assert!(sticky.is_invalidated());
1284
1285        // Committing with the wrong transaction id will keep it invalidated.
1286        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1287        assert!(sticky.is_invalidated());
1288
1289        // Restarting a request will only remember the last generated transaction id.
1290        let txn_id1: &TransactionId = "tid456".into();
1291        let mut request1 = http::Request::default();
1292        request1.txn_id = Some(txn_id1.to_string());
1293        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1294
1295        assert!(sticky.is_invalidated());
1296        // The first room subscription has been applied to `request`, so it's not
1297        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1298        // related to the sticky design.
1299        assert_eq!(request1.room_subscriptions.len(), 1);
1300        assert!(request1.room_subscriptions.contains_key(r1));
1301
1302        let txn_id2: &TransactionId = "tid789".into();
1303        let mut request2 = http::Request::default();
1304        request2.txn_id = Some(txn_id2.to_string());
1305
1306        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1307        assert!(sticky.is_invalidated());
1308        // `request2` contains `r1` because the sticky parameters have not been
1309        // committed, so it's still marked as pending.
1310        assert_eq!(request2.room_subscriptions.len(), 1);
1311        assert!(request2.room_subscriptions.contains_key(r1));
1312
1313        // Here we commit with the not most-recent TID, so it keeps the invalidated
1314        // status.
1315        sticky.maybe_commit(txn_id1);
1316        assert!(sticky.is_invalidated());
1317
1318        // But here we use the latest TID, so the commit is effective.
1319        sticky.maybe_commit(txn_id2);
1320        assert!(!sticky.is_invalidated());
1321    }
1322
1323    #[test]
1324    fn test_room_subscriptions_are_sticky() {
1325        let r0 = room_id!("!r0.matrix.org");
1326        let r1 = room_id!("!r1:matrix.org");
1327
1328        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1329            BTreeMap::new(),
1330            Default::default(),
1331        ));
1332
1333        // A room subscription is added, applied, and committed.
1334        {
1335            // Insert `r0`.
1336            sticky
1337                .data_mut()
1338                .room_subscriptions
1339                .insert(r0.to_owned(), (Default::default(), Default::default()));
1340
1341            // Then the sticky parameters are applied.
1342            let txn_id: &TransactionId = "tid0".into();
1343            let mut request = http::Request::default();
1344            request.txn_id = Some(txn_id.to_string());
1345
1346            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1347
1348            assert!(request.txn_id.is_some());
1349            assert_eq!(request.room_subscriptions.len(), 1);
1350            assert!(request.room_subscriptions.contains_key(r0));
1351
1352            // Then the sticky parameters are committed.
1353            let tid = request.txn_id.unwrap();
1354
1355            sticky.maybe_commit(tid.as_str().into());
1356        }
1357
1358        // A room subscription is added, applied, but NOT committed.
1359        {
1360            // Insert `r1`.
1361            sticky
1362                .data_mut()
1363                .room_subscriptions
1364                .insert(r1.to_owned(), (Default::default(), Default::default()));
1365
1366            // Then the sticky parameters are applied.
1367            let txn_id: &TransactionId = "tid1".into();
1368            let mut request = http::Request::default();
1369            request.txn_id = Some(txn_id.to_string());
1370
1371            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1372
1373            assert!(request.txn_id.is_some());
1374            assert_eq!(request.room_subscriptions.len(), 1);
1375            // `r0` is not present, it's only `r1`.
1376            assert!(request.room_subscriptions.contains_key(r1));
1377
1378            // Then the sticky parameters are NOT committed.
1379            // It can happen if the request has failed to be sent for example,
1380            // or if the response didn't match.
1381        }
1382
1383        // A previously added room subscription is re-added, applied, and committed.
1384        {
1385            // Then the sticky parameters are applied.
1386            let txn_id: &TransactionId = "tid2".into();
1387            let mut request = http::Request::default();
1388            request.txn_id = Some(txn_id.to_string());
1389
1390            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1391
1392            assert!(request.txn_id.is_some());
1393            assert_eq!(request.room_subscriptions.len(), 1);
1394            // `r0` is not present, it's only `r1`.
1395            assert!(request.room_subscriptions.contains_key(r1));
1396
1397            // Then the sticky parameters are committed.
1398            let tid = request.txn_id.unwrap();
1399
1400            sticky.maybe_commit(tid.as_str().into());
1401        }
1402
1403        // All room subscriptions have been committed.
1404        {
1405            // Then the sticky parameters are applied.
1406            let txn_id: &TransactionId = "tid3".into();
1407            let mut request = http::Request::default();
1408            request.txn_id = Some(txn_id.to_string());
1409
1410            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1411
1412            assert!(request.txn_id.is_some());
1413            // All room subscriptions have been sent.
1414            assert!(request.room_subscriptions.is_empty());
1415        }
1416    }
1417
1418    #[test]
1419    fn test_extensions_are_sticky() {
1420        let mut extensions = http::request::Extensions::default();
1421        extensions.account_data.enabled = Some(true);
1422
1423        // At first it's invalidated.
1424        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1425            Default::default(),
1426            extensions,
1427        ));
1428
1429        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1430
1431        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1432        // to-device.
1433        let extensions = &sticky.data().extensions;
1434        assert_eq!(extensions.e2ee.enabled, None);
1435        assert_eq!(extensions.to_device.enabled, None);
1436        assert_eq!(extensions.to_device.since, None);
1437
1438        // What the user explicitly enabled is… enabled.
1439        assert_eq!(extensions.account_data.enabled, Some(true));
1440
1441        let txn_id: &TransactionId = "tid123".into();
1442        let mut request = http::Request::default();
1443        request.txn_id = Some(txn_id.to_string());
1444        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1445        assert!(sticky.is_invalidated());
1446        assert_eq!(request.extensions.to_device.enabled, None);
1447        assert_eq!(request.extensions.to_device.since, None);
1448        assert_eq!(request.extensions.e2ee.enabled, None);
1449        assert_eq!(request.extensions.account_data.enabled, Some(true));
1450    }
1451
1452    #[async_test]
1453    async fn test_sticky_extensions_plus_since() -> Result<()> {
1454        let server = MockServer::start().await;
1455        let client = logged_in_client(Some(server.uri())).await;
1456
1457        let sync = client
1458            .sliding_sync("test-slidingsync")?
1459            .add_list(SlidingSyncList::builder("new_list"))
1460            .build()
1461            .await?;
1462
1463        // No extensions have been explicitly enabled here.
1464        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1465        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1466        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1467
1468        // Now enable e2ee and to-device.
1469        let sync = client
1470            .sliding_sync("test-slidingsync")?
1471            .add_list(SlidingSyncList::builder("new_list"))
1472            .with_to_device_extension(
1473                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1474            )
1475            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1476            .build()
1477            .await?;
1478
1479        // Even without a since token, the first request will contain the extensions
1480        // configuration, at least.
1481        let txn_id = TransactionId::new();
1482        let (request, _, _) = sync
1483            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1484            .await?;
1485
1486        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1487        assert_eq!(request.extensions.to_device.enabled, Some(true));
1488        assert!(request.extensions.to_device.since.is_none());
1489
1490        {
1491            // Committing with another transaction id doesn't validate anything.
1492            let mut sticky = sync.inner.sticky.write().unwrap();
1493            assert!(sticky.is_invalidated());
1494            sticky.maybe_commit(
1495                "hopefully the rng won't generate this very specific transaction id".into(),
1496            );
1497            assert!(sticky.is_invalidated());
1498        }
1499
1500        // Regenerating a request will yield the same one.
1501        let txn_id2 = TransactionId::new();
1502        let (request, _, _) = sync
1503            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1504            .await?;
1505
1506        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1507        assert_eq!(request.extensions.to_device.enabled, Some(true));
1508        assert!(request.extensions.to_device.since.is_none());
1509
1510        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1511
1512        {
1513            // Committing with the expected transaction id will validate it.
1514            let mut sticky = sync.inner.sticky.write().unwrap();
1515            assert!(sticky.is_invalidated());
1516            sticky.maybe_commit(txn_id2.as_str().into());
1517            assert!(!sticky.is_invalidated());
1518        }
1519
1520        // The next request should contain no sticky parameters.
1521        let txn_id = TransactionId::new();
1522        let (request, _, _) = sync
1523            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1524            .await?;
1525        assert!(request.extensions.e2ee.enabled.is_none());
1526        assert!(request.extensions.to_device.enabled.is_none());
1527        assert!(request.extensions.to_device.since.is_none());
1528
1529        // If there's a to-device `since` token, we make sure we put the token
1530        // into the extension config. The rest doesn't need to be re-enabled due to
1531        // stickiness.
1532        let _since_token = "since";
1533
1534        #[cfg(feature = "e2e-encryption")]
1535        {
1536            use matrix_sdk_base::crypto::store::Changes;
1537            if let Some(olm_machine) = &*client.olm_machine().await {
1538                olm_machine
1539                    .store()
1540                    .save_changes(Changes {
1541                        next_batch_token: Some(_since_token.to_owned()),
1542                        ..Default::default()
1543                    })
1544                    .await?;
1545            }
1546        }
1547
1548        let txn_id = TransactionId::new();
1549        let (request, _, _) = sync
1550            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1551            .await?;
1552
1553        assert!(request.extensions.e2ee.enabled.is_none());
1554        assert!(request.extensions.to_device.enabled.is_none());
1555
1556        #[cfg(feature = "e2e-encryption")]
1557        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1558
1559        Ok(())
1560    }
1561
1562    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1563    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1564    // `/key/query` requests must be sent. See the code to see the details.
1565    //
1566    // This test is asserting that.
1567    #[async_test]
1568    #[cfg(feature = "e2e-encryption")]
1569    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1570        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1571        use matrix_sdk_test::ruma_response_from_json;
1572        use ruma::user_id;
1573
1574        let server = MockServer::start().await;
1575        let client = logged_in_client(Some(server.uri())).await;
1576
1577        let alice = user_id!("@alice:localhost");
1578        let bob = user_id!("@bob:localhost");
1579        let me = user_id!("@example:localhost");
1580
1581        // Track and mark users are not dirty, so that we can check they are “dirty”
1582        // after that. Dirty here means that a `/key/query` must be sent.
1583        {
1584            let olm_machine = client.olm_machine().await;
1585            let olm_machine = olm_machine.as_ref().unwrap();
1586
1587            olm_machine.update_tracked_users([alice, bob]).await?;
1588
1589            // Assert requests.
1590            let outgoing_requests = olm_machine.outgoing_requests().await?;
1591
1592            assert_eq!(outgoing_requests.len(), 2);
1593            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1594            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1595
1596            // Fake responses.
1597            olm_machine
1598                .mark_request_as_sent(
1599                    outgoing_requests[0].request_id(),
1600                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1601                        "one_time_key_counts": {}
1602                    }))),
1603                )
1604                .await?;
1605
1606            olm_machine
1607                .mark_request_as_sent(
1608                    outgoing_requests[1].request_id(),
1609                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1610                        "device_keys": {
1611                            alice: {},
1612                            bob: {},
1613                        }
1614                    }))),
1615                )
1616                .await?;
1617
1618            // Once more.
1619            let outgoing_requests = olm_machine.outgoing_requests().await?;
1620
1621            assert_eq!(outgoing_requests.len(), 1);
1622            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1623
1624            olm_machine
1625                .mark_request_as_sent(
1626                    outgoing_requests[0].request_id(),
1627                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1628                        "device_keys": {
1629                            me: {},
1630                        }
1631                    }))),
1632                )
1633                .await?;
1634
1635            // No more.
1636            let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638            assert!(outgoing_requests.is_empty());
1639        }
1640
1641        let sync = client
1642            .sliding_sync("test-slidingsync")?
1643            .add_list(SlidingSyncList::builder("new_list"))
1644            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1645            .build()
1646            .await?;
1647
1648        // First request: no `pos`.
1649        let txn_id = TransactionId::new();
1650        let (_request, _, _) = sync
1651            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1652            .await?;
1653
1654        // Now, tracked users must be dirty.
1655        {
1656            let olm_machine = client.olm_machine().await;
1657            let olm_machine = olm_machine.as_ref().unwrap();
1658
1659            // Assert requests.
1660            let outgoing_requests = olm_machine.outgoing_requests().await?;
1661
1662            assert_eq!(outgoing_requests.len(), 1);
1663            assert_matches!(
1664                outgoing_requests[0].request(),
1665                AnyOutgoingRequest::KeysQuery(request) => {
1666                    assert!(request.device_keys.contains_key(alice));
1667                    assert!(request.device_keys.contains_key(bob));
1668                    assert!(request.device_keys.contains_key(me));
1669                }
1670            );
1671
1672            // Fake responses.
1673            olm_machine
1674                .mark_request_as_sent(
1675                    outgoing_requests[0].request_id(),
1676                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1677                        "device_keys": {
1678                            alice: {},
1679                            bob: {},
1680                            me: {},
1681                        }
1682                    }))),
1683                )
1684                .await?;
1685        }
1686
1687        // Second request: with a `pos` this time.
1688        sync.set_pos("chocolat".to_owned()).await;
1689
1690        let txn_id = TransactionId::new();
1691        let (_request, _, _) = sync
1692            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1693            .await?;
1694
1695        // Tracked users are not marked as dirty.
1696        {
1697            let olm_machine = client.olm_machine().await;
1698            let olm_machine = olm_machine.as_ref().unwrap();
1699
1700            // Assert requests.
1701            let outgoing_requests = olm_machine.outgoing_requests().await?;
1702
1703            assert!(outgoing_requests.is_empty());
1704        }
1705
1706        Ok(())
1707    }
1708
1709    #[async_test]
1710    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1711        let server = MockServer::start().await;
1712        let client = logged_in_client(Some(server.uri())).await;
1713
1714        let sliding_sync = client
1715            .sliding_sync("test-slidingsync")?
1716            .with_to_device_extension(
1717                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1718            )
1719            .build()
1720            .await?;
1721
1722        // First request asks to enable the extension.
1723        let (request, _, _) =
1724            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1725        assert!(request.extensions.to_device.enabled.is_some());
1726
1727        let sync = sliding_sync.sync();
1728        pin_mut!(sync);
1729
1730        // `pos` is `None` to start with.
1731        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1732
1733        #[derive(Deserialize)]
1734        struct PartialRequest {
1735            txn_id: Option<String>,
1736        }
1737
1738        {
1739            let _mock_guard = Mock::given(SlidingSyncMatcher)
1740                .respond_with(|request: &Request| {
1741                    // Repeat the txn_id in the response, if set.
1742                    let request: PartialRequest = request.body_json().unwrap();
1743
1744                    ResponseTemplate::new(200).set_body_json(json!({
1745                        "txn_id": request.txn_id,
1746                        "pos": "0",
1747                    }))
1748                })
1749                .mount_as_scoped(&server)
1750                .await;
1751
1752            let next = sync.next().await;
1753            assert_matches!(next, Some(Ok(_update_summary)));
1754
1755            // `pos` has been updated.
1756            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1757        }
1758
1759        // Next request doesn't ask to enable the extension.
1760        let (request, _, _) =
1761            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1762        assert!(request.extensions.to_device.enabled.is_none());
1763
1764        // Next request is successful.
1765        {
1766            let _mock_guard = Mock::given(SlidingSyncMatcher)
1767                .respond_with(|request: &Request| {
1768                    // Repeat the txn_id in the response, if set.
1769                    let request: PartialRequest = request.body_json().unwrap();
1770
1771                    ResponseTemplate::new(200).set_body_json(json!({
1772                        "txn_id": request.txn_id,
1773                        "pos": "1",
1774                    }))
1775                })
1776                .mount_as_scoped(&server)
1777                .await;
1778
1779            let next = sync.next().await;
1780            assert_matches!(next, Some(Ok(_update_summary)));
1781
1782            // `pos` has been updated.
1783            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1784        }
1785
1786        // Next request is successful despite it receives an already
1787        // received `pos` from the server.
1788        {
1789            let _mock_guard = Mock::given(SlidingSyncMatcher)
1790                .respond_with(|request: &Request| {
1791                    // Repeat the txn_id in the response, if set.
1792                    let request: PartialRequest = request.body_json().unwrap();
1793
1794                    ResponseTemplate::new(200).set_body_json(json!({
1795                        "txn_id": request.txn_id,
1796                        "pos": "0", // <- already received!
1797                    }))
1798                })
1799                .up_to_n_times(1) // run this mock only once.
1800                .mount_as_scoped(&server)
1801                .await;
1802
1803            let next = sync.next().await;
1804            assert_matches!(next, Some(Ok(_update_summary)));
1805
1806            // `pos` has been updated.
1807            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1808        }
1809
1810        // Stop responding with successful requests!
1811        //
1812        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1813        // so they're reset. It also resets the `pos`.
1814        {
1815            let _mock_guard = Mock::given(SlidingSyncMatcher)
1816                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1817                    "error": "foo",
1818                    "errcode": "M_UNKNOWN_POS",
1819                })))
1820                .mount_as_scoped(&server)
1821                .await;
1822
1823            let next = sync.next().await;
1824
1825            // The expected error is returned.
1826            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1827
1828            // `pos` has been reset.
1829            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1830
1831            // Next request asks to enable the extension again.
1832            let (request, _, _) =
1833                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1834
1835            assert!(request.extensions.to_device.enabled.is_some());
1836
1837            // `sync` has been stopped.
1838            assert!(sync.next().await.is_none());
1839        }
1840
1841        Ok(())
1842    }
1843
1844    #[cfg(feature = "e2e-encryption")]
1845    #[async_test]
1846    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1847        let server = MockServer::start().await;
1848
1849        #[derive(Deserialize)]
1850        struct PartialRequest {
1851            txn_id: Option<String>,
1852        }
1853
1854        let server_pos = Arc::new(Mutex::new(0));
1855        let _mock_guard = Mock::given(SlidingSyncMatcher)
1856            .respond_with(move |request: &Request| {
1857                // Repeat the txn_id in the response, if set.
1858                let request: PartialRequest = request.body_json().unwrap();
1859                let pos = {
1860                    let mut pos = server_pos.lock().unwrap();
1861                    let prev = *pos;
1862                    *pos += 1;
1863                    prev
1864                };
1865
1866                ResponseTemplate::new(200).set_body_json(json!({
1867                    "txn_id": request.txn_id,
1868                    "pos": pos.to_string(),
1869                }))
1870            })
1871            .mount_as_scoped(&server)
1872            .await;
1873
1874        let client = logged_in_client(Some(server.uri())).await;
1875
1876        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1877
1878        // `pos` is `None` to start with.
1879        {
1880            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1881
1882            let (request, _, _) =
1883                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1884            assert!(request.pos.is_none());
1885        }
1886
1887        let sync = sliding_sync.sync();
1888        pin_mut!(sync);
1889
1890        // Sync goes well, and then the position is saved both into the internal memory
1891        // and the database.
1892        let next = sync.next().await;
1893        assert_matches!(next, Some(Ok(_update_summary)));
1894
1895        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896
1897        let restored_fields = restore_sliding_sync_state(
1898            &client,
1899            &sliding_sync.inner.storage_key,
1900            &*sliding_sync.inner.lists.read().await,
1901        )
1902        .await?
1903        .expect("must have restored fields");
1904
1905        // While it has been saved into the database, it's not necessarily going to be
1906        // used later!
1907        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1908
1909        // Now, even if we mess with the position stored in the database, the sliding
1910        // sync instance isn't configured to reload the stream position from the
1911        // database, so it won't be changed.
1912        {
1913            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1914
1915            let mut position_guard = other_sync.inner.position.lock().await;
1916            position_guard.pos = Some("yolo".to_owned());
1917
1918            other_sync.cache_to_storage(&position_guard).await?;
1919        }
1920
1921        // It's still 0, not "yolo".
1922        {
1923            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1924            let (request, _, _) =
1925                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1926            assert_eq!(request.pos.as_deref(), Some("0"));
1927        }
1928
1929        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1930        // asked to.
1931        {
1932            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1933            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1934        }
1935
1936        Ok(())
1937    }
1938
1939    #[cfg(feature = "e2e-encryption")]
1940    #[async_test]
1941    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1942        let server = MockServer::start().await;
1943
1944        #[derive(Deserialize)]
1945        struct PartialRequest {
1946            txn_id: Option<String>,
1947        }
1948
1949        let server_pos = Arc::new(Mutex::new(0));
1950        let _mock_guard = Mock::given(SlidingSyncMatcher)
1951            .respond_with(move |request: &Request| {
1952                // Repeat the txn_id in the response, if set.
1953                let request: PartialRequest = request.body_json().unwrap();
1954                let pos = {
1955                    let mut pos = server_pos.lock().unwrap();
1956                    let prev = *pos;
1957                    *pos += 1;
1958                    prev
1959                };
1960
1961                ResponseTemplate::new(200).set_body_json(json!({
1962                    "txn_id": request.txn_id,
1963                    "pos": pos.to_string(),
1964                }))
1965            })
1966            .mount_as_scoped(&server)
1967            .await;
1968
1969        let client = logged_in_client(Some(server.uri())).await;
1970
1971        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1972
1973        // `pos` is `None` to start with.
1974        {
1975            let (request, _, _) =
1976                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1977
1978            assert!(request.pos.is_none());
1979            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1980        }
1981
1982        let sync = sliding_sync.sync();
1983        pin_mut!(sync);
1984
1985        // Sync goes well, and then the position is saved both into the internal memory
1986        // and the database.
1987        let next = sync.next().await;
1988        assert_matches!(next, Some(Ok(_update_summary)));
1989
1990        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1991
1992        let restored_fields = restore_sliding_sync_state(
1993            &client,
1994            &sliding_sync.inner.storage_key,
1995            &*sliding_sync.inner.lists.read().await,
1996        )
1997        .await?
1998        .expect("must have restored fields");
1999
2000        // While it has been saved into the database, it's not necessarily going to be
2001        // used later!
2002        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2003
2004        // Another process modifies the stream position under our feet...
2005        {
2006            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2007
2008            let mut position_guard = other_sync.inner.position.lock().await;
2009            position_guard.pos = Some("42".to_owned());
2010
2011            other_sync.cache_to_storage(&position_guard).await?;
2012        }
2013
2014        // It's alright, the next request will load it from the database.
2015        {
2016            let (request, _, _) =
2017                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2018            assert_eq!(request.pos.as_deref(), Some("42"));
2019            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2020        }
2021
2022        // Recreating a sliding sync with the same ID will reload it too.
2023        {
2024            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2025            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2026
2027            let (request, _, _) =
2028                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2029            assert_eq!(request.pos.as_deref(), Some("42"));
2030        }
2031
2032        // Invalidating the session will remove the in-memory value AND the database
2033        // value.
2034        sliding_sync.expire_session().await;
2035
2036        {
2037            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2038
2039            let (request, _, _) =
2040                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2041            assert!(request.pos.is_none());
2042        }
2043
2044        // And new sliding syncs with the same ID won't find it either.
2045        {
2046            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2047            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2048
2049            let (request, _, _) =
2050                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2051            assert!(request.pos.is_none());
2052        }
2053
2054        Ok(())
2055    }
2056
2057    #[async_test]
2058    async fn test_stop_sync_loop() -> Result<()> {
2059        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2060            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2061        .await?;
2062
2063        // Start the sync loop.
2064        let stream = sliding_sync.sync();
2065        pin_mut!(stream);
2066
2067        // The sync loop is actually running.
2068        assert!(stream.next().await.is_some());
2069
2070        // Stop the sync loop.
2071        sliding_sync.stop_sync()?;
2072
2073        // The sync loop is actually stopped.
2074        assert!(stream.next().await.is_none());
2075
2076        // Start a new sync loop.
2077        let stream = sliding_sync.sync();
2078        pin_mut!(stream);
2079
2080        // The sync loop is actually running.
2081        assert!(stream.next().await.is_some());
2082
2083        Ok(())
2084    }
2085
2086    #[async_test]
2087    async fn test_process_read_receipts() -> Result<()> {
2088        let room = owned_room_id!("!pony:example.org");
2089
2090        let server = MockServer::start().await;
2091        let client = logged_in_client(Some(server.uri())).await;
2092
2093        let sliding_sync = client
2094            .sliding_sync("test")?
2095            .with_receipt_extension(
2096                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2097            )
2098            .add_list(
2099                SlidingSyncList::builder("all")
2100                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2101            )
2102            .build()
2103            .await?;
2104
2105        // Initial state.
2106        {
2107            let server_response = assign!(http::Response::new("0".to_owned()), {
2108                rooms: BTreeMap::from([(
2109                    room.clone(),
2110                    http::response::Room::default(),
2111                )])
2112            });
2113
2114            let _summary = {
2115                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2116                sliding_sync
2117                    .handle_response(
2118                        server_response.clone(),
2119                        &mut pos_guard,
2120                        RequestedRequiredStates::default(),
2121                    )
2122                    .await?
2123            };
2124        }
2125
2126        let server_response = assign!(http::Response::new("1".to_owned()), {
2127            extensions: assign!(http::response::Extensions::default(), {
2128                receipts: assign!(http::response::Receipts::default(), {
2129                    rooms: BTreeMap::from([
2130                        (
2131                            room.clone(),
2132                            Raw::from_json_string(
2133                                json!({
2134                                    "room_id": room,
2135                                    "type": "m.receipt",
2136                                    "content": {
2137                                        "$event:bar.org": {
2138                                            "m.read": {
2139                                                client.user_id().unwrap(): {
2140                                                    "ts": 1436451550,
2141                                                }
2142                                            }
2143                                        }
2144                                    }
2145                                })
2146                                .to_string(),
2147                            ).unwrap()
2148                        )
2149                    ])
2150                })
2151            })
2152        });
2153
2154        let summary = {
2155            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2156            sliding_sync
2157                .handle_response(
2158                    server_response.clone(),
2159                    &mut pos_guard,
2160                    RequestedRequiredStates::default(),
2161                )
2162                .await?
2163        };
2164
2165        assert!(summary.rooms.contains(&room));
2166
2167        Ok(())
2168    }
2169
2170    #[async_test]
2171    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2172        let room_id = owned_room_id!("!unicorn:example.org");
2173
2174        let server = MockServer::start().await;
2175        let client = logged_in_client(Some(server.uri())).await;
2176
2177        // Setup sliding sync with with one room and one list
2178
2179        let sliding_sync = client
2180            .sliding_sync("test")?
2181            .with_account_data_extension(
2182                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2183            )
2184            .add_list(
2185                SlidingSyncList::builder("all")
2186                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2187            )
2188            .build()
2189            .await?;
2190
2191        // Initial state.
2192        {
2193            let server_response = assign!(http::Response::new("0".to_owned()), {
2194                rooms: BTreeMap::from([(
2195                    room_id.clone(),
2196                    http::response::Room::default(),
2197                )])
2198            });
2199
2200            let _summary = {
2201                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2202                sliding_sync
2203                    .handle_response(
2204                        server_response.clone(),
2205                        &mut pos_guard,
2206                        RequestedRequiredStates::default(),
2207                    )
2208                    .await?
2209            };
2210        }
2211
2212        // Simulate a response that only changes the marked unread state of the room to
2213        // true
2214
2215        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2216
2217        let update_summary = {
2218            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2219            sliding_sync
2220                .handle_response(
2221                    server_response.clone(),
2222                    &mut pos_guard,
2223                    RequestedRequiredStates::default(),
2224                )
2225                .await?
2226        };
2227
2228        // Check that the list list and entry received the update
2229
2230        assert!(update_summary.rooms.contains(&room_id));
2231
2232        let room = client.get_room(&room_id).unwrap();
2233
2234        // Check the actual room data, this powers RoomInfo
2235
2236        assert!(room.is_marked_unread());
2237
2238        // Change it back to false and check if it updates
2239
2240        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2241
2242        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2243        sliding_sync
2244            .handle_response(
2245                server_response.clone(),
2246                &mut pos_guard,
2247                RequestedRequiredStates::default(),
2248            )
2249            .await?;
2250
2251        let room = client.get_room(&room_id).unwrap();
2252
2253        assert!(!room.is_marked_unread());
2254
2255        Ok(())
2256    }
2257
2258    fn make_mark_unread_response(
2259        response_number: &str,
2260        room_id: OwnedRoomId,
2261        unread: bool,
2262        add_rooms_section: bool,
2263    ) -> http::Response {
2264        let rooms = if add_rooms_section {
2265            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2266        } else {
2267            BTreeMap::new()
2268        };
2269
2270        let extensions = assign!(http::response::Extensions::default(), {
2271            account_data: assign!(http::response::AccountData::default(), {
2272                rooms: BTreeMap::from([
2273                    (
2274                        room_id,
2275                        vec![
2276                            Raw::from_json_string(
2277                                json!({
2278                                    "content": {
2279                                        "unread": unread
2280                                    },
2281                                    "type": "com.famedly.marked_unread"
2282                                })
2283                                .to_string(),
2284                            ).unwrap()
2285                        ]
2286                    )
2287                ])
2288            })
2289        });
2290
2291        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2292    }
2293
2294    #[async_test]
2295    async fn test_process_rooms_account_data() -> Result<()> {
2296        let room = owned_room_id!("!pony:example.org");
2297
2298        let server = MockServer::start().await;
2299        let client = logged_in_client(Some(server.uri())).await;
2300
2301        let sliding_sync = client
2302            .sliding_sync("test")?
2303            .with_account_data_extension(
2304                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2305            )
2306            .add_list(
2307                SlidingSyncList::builder("all")
2308                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2309            )
2310            .build()
2311            .await?;
2312
2313        // Initial state.
2314        {
2315            let server_response = assign!(http::Response::new("0".to_owned()), {
2316                rooms: BTreeMap::from([(
2317                    room.clone(),
2318                    http::response::Room::default(),
2319                )])
2320            });
2321
2322            let _summary = {
2323                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2324                sliding_sync
2325                    .handle_response(
2326                        server_response.clone(),
2327                        &mut pos_guard,
2328                        RequestedRequiredStates::default(),
2329                    )
2330                    .await?
2331            };
2332        }
2333
2334        let server_response = assign!(http::Response::new("1".to_owned()), {
2335            extensions: assign!(http::response::Extensions::default(), {
2336                account_data: assign!(http::response::AccountData::default(), {
2337                    rooms: BTreeMap::from([
2338                        (
2339                            room.clone(),
2340                            vec![
2341                                Raw::from_json_string(
2342                                    json!({
2343                                        "content": {
2344                                            "tags": {
2345                                                "u.work": {
2346                                                    "order": 0.9
2347                                                }
2348                                            }
2349                                        },
2350                                        "type": "m.tag"
2351                                    })
2352                                    .to_string(),
2353                                ).unwrap()
2354                            ]
2355                        )
2356                    ])
2357                })
2358            })
2359        });
2360        let summary = {
2361            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2362            sliding_sync
2363                .handle_response(
2364                    server_response.clone(),
2365                    &mut pos_guard,
2366                    RequestedRequiredStates::default(),
2367                )
2368                .await?
2369        };
2370
2371        assert!(summary.rooms.contains(&room));
2372
2373        Ok(())
2374    }
2375
2376    #[async_test]
2377    #[cfg(feature = "e2e-encryption")]
2378    async fn test_process_only_encryption_events() -> Result<()> {
2379        use ruma::OneTimeKeyAlgorithm;
2380
2381        let room = owned_room_id!("!croissant:example.org");
2382
2383        let server = MockServer::start().await;
2384        let client = logged_in_client(Some(server.uri())).await;
2385
2386        let server_response = assign!(http::Response::new("0".to_owned()), {
2387            rooms: BTreeMap::from([(
2388                room.clone(),
2389                assign!(http::response::Room::default(), {
2390                    name: Some("Croissants lovers".to_owned()),
2391                    timeline: Vec::new(),
2392                }),
2393            )]),
2394
2395            extensions: assign!(http::response::Extensions::default(), {
2396                e2ee: assign!(http::response::E2EE::default(), {
2397                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2398                }),
2399                to_device: Some(assign!(http::response::ToDevice::default(), {
2400                    next_batch: "to-device-token".to_owned(),
2401                })),
2402            })
2403        });
2404
2405        // Don't process non-encryption events if the sliding sync is configured for
2406        // encryption only.
2407
2408        let sliding_sync = client
2409            .sliding_sync("test")?
2410            .with_to_device_extension(
2411                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2412            )
2413            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2414            .build()
2415            .await?;
2416
2417        {
2418            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2419
2420            sliding_sync
2421                .handle_response(
2422                    server_response.clone(),
2423                    &mut position_guard,
2424                    RequestedRequiredStates::default(),
2425                )
2426                .await?;
2427        }
2428
2429        // E2EE has been properly handled.
2430        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2431        assert_eq!(uploaded_key_count, 42);
2432
2433        {
2434            let olm_machine = &*client.olm_machine_for_testing().await;
2435            assert_eq!(
2436                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2437                Some("to-device-token")
2438            );
2439        }
2440
2441        // Room events haven't.
2442        assert!(client.get_room(&room).is_none());
2443
2444        // Conversely, only process room lists events if the sliding sync was configured
2445        // as so.
2446        let client = logged_in_client(Some(server.uri())).await;
2447
2448        let sliding_sync = client
2449            .sliding_sync("test")?
2450            .add_list(SlidingSyncList::builder("thelist"))
2451            .build()
2452            .await?;
2453
2454        {
2455            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2456
2457            sliding_sync
2458                .handle_response(
2459                    server_response.clone(),
2460                    &mut position_guard,
2461                    RequestedRequiredStates::default(),
2462                )
2463                .await?;
2464        }
2465
2466        // E2EE response has been ignored.
2467        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2468        assert_eq!(uploaded_key_count, 0);
2469
2470        {
2471            let olm_machine = &*client.olm_machine_for_testing().await;
2472            assert_eq!(
2473                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2474                None
2475            );
2476        }
2477
2478        // The room is now known.
2479        assert!(client.get_room(&room).is_some());
2480
2481        // And it's also possible to set up both.
2482        let client = logged_in_client(Some(server.uri())).await;
2483
2484        let sliding_sync = client
2485            .sliding_sync("test")?
2486            .add_list(SlidingSyncList::builder("thelist"))
2487            .with_to_device_extension(
2488                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2489            )
2490            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2491            .build()
2492            .await?;
2493
2494        {
2495            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2496
2497            sliding_sync
2498                .handle_response(
2499                    server_response.clone(),
2500                    &mut position_guard,
2501                    RequestedRequiredStates::default(),
2502                )
2503                .await?;
2504        }
2505
2506        // E2EE has been properly handled.
2507        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2508        assert_eq!(uploaded_key_count, 42);
2509
2510        {
2511            let olm_machine = &*client.olm_machine_for_testing().await;
2512            assert_eq!(
2513                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2514                Some("to-device-token")
2515            );
2516        }
2517
2518        // The room is now known.
2519        assert!(client.get_room(&room).is_some());
2520
2521        Ok(())
2522    }
2523
2524    #[async_test]
2525    async fn test_lock_multiple_requests() -> Result<()> {
2526        let server = MockServer::start().await;
2527        let client = logged_in_client(Some(server.uri())).await;
2528
2529        let pos = Arc::new(Mutex::new(0));
2530        let _mock_guard = Mock::given(SlidingSyncMatcher)
2531            .respond_with(move |_: &Request| {
2532                let mut pos = pos.lock().unwrap();
2533                *pos += 1;
2534                ResponseTemplate::new(200).set_body_json(json!({
2535                    "pos": pos.to_string(),
2536                    "lists": {},
2537                    "rooms": {}
2538                }))
2539            })
2540            .mount_as_scoped(&server)
2541            .await;
2542
2543        let sliding_sync = client
2544            .sliding_sync("test")?
2545            .with_to_device_extension(
2546                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2547            )
2548            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2549            .build()
2550            .await?;
2551
2552        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2553        // test would never terminate.
2554        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2555
2556        for result in requests.await {
2557            result?;
2558        }
2559
2560        Ok(())
2561    }
2562
2563    #[async_test]
2564    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2565        let server = MockServer::start().await;
2566        let client = logged_in_client(Some(server.uri())).await;
2567
2568        let pos = Arc::new(Mutex::new(0));
2569        let _mock_guard = Mock::given(SlidingSyncMatcher)
2570            .respond_with(move |_: &Request| {
2571                let mut pos = pos.lock().unwrap();
2572                *pos += 1;
2573                // Respond slowly enough that we can skip one iteration.
2574                ResponseTemplate::new(200)
2575                    .set_body_json(json!({
2576                        "pos": pos.to_string(),
2577                        "lists": {},
2578                        "rooms": {}
2579                    }))
2580                    .set_delay(Duration::from_secs(2))
2581            })
2582            .mount_as_scoped(&server)
2583            .await;
2584
2585        let sliding_sync =
2586            client
2587                .sliding_sync("test")?
2588                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2589                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2590                ))
2591                .add_list(
2592                    SlidingSyncList::builder("another-list")
2593                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2594                )
2595                .build()
2596                .await?;
2597
2598        let stream = sliding_sync.sync();
2599        pin_mut!(stream);
2600
2601        let cloned_sync = sliding_sync.clone();
2602        tokio::spawn(async move {
2603            tokio::time::sleep(Duration::from_millis(100)).await;
2604
2605            cloned_sync
2606                .on_list("another-list", |list| {
2607                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2608                    ready(())
2609                })
2610                .await;
2611        });
2612
2613        assert_matches!(stream.next().await, Some(Ok(_)));
2614
2615        sliding_sync.stop_sync().unwrap();
2616
2617        assert_matches!(stream.next().await, None);
2618
2619        let mut num_requests = 0;
2620
2621        for request in server.received_requests().await.unwrap() {
2622            if !SlidingSyncMatcher.matches(&request) {
2623                continue;
2624            }
2625
2626            let another_list_ranges = if num_requests == 0 {
2627                // First request
2628                json!([[0, 10]])
2629            } else {
2630                // Second request
2631                json!([[10, 20]])
2632            };
2633
2634            num_requests += 1;
2635            assert!(num_requests <= 2, "more than one request hit the server");
2636
2637            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2638
2639            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2640                &json_value,
2641                &json!({
2642                    "conn_id": "test",
2643                    "lists": {
2644                        "room-list": {
2645                            "ranges": [[0, 9]],
2646                            "required_state": [
2647                                ["m.room.encryption", ""],
2648                                ["m.room.tombstone", ""]
2649                            ],
2650                        },
2651                        "another-list": {
2652                            "ranges": another_list_ranges,
2653                            "required_state": [
2654                                ["m.room.encryption", ""],
2655                                ["m.room.tombstone", ""]
2656                            ],
2657                        },
2658                    }
2659                }),
2660                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2661            ) {
2662                dbg!(json_value);
2663                panic!("json differ: {err}");
2664            }
2665        }
2666
2667        assert_eq!(num_requests, 2);
2668
2669        Ok(())
2670    }
2671
2672    #[async_test]
2673    async fn test_timeout_zero_list() -> Result<()> {
2674        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2675
2676        let (request, _, _) =
2677            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2678
2679        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2680        // on new update to pop.
2681        assert!(request.timeout.is_some());
2682
2683        Ok(())
2684    }
2685
2686    #[async_test]
2687    async fn test_timeout_one_list() -> Result<()> {
2688        let (_server, sliding_sync) = new_sliding_sync(vec![
2689            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2690        ])
2691        .await?;
2692
2693        let (request, _, _) =
2694            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2695
2696        // The list does not require a timeout.
2697        assert!(request.timeout.is_none());
2698
2699        // Simulate a response.
2700        {
2701            let server_response = assign!(http::Response::new("0".to_owned()), {
2702                lists: BTreeMap::from([(
2703                    "foo".to_owned(),
2704                    assign!(http::response::List::default(), {
2705                        count: uint!(7),
2706                    })
2707                 )])
2708            });
2709
2710            let _summary = {
2711                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2712                sliding_sync
2713                    .handle_response(
2714                        server_response.clone(),
2715                        &mut pos_guard,
2716                        RequestedRequiredStates::default(),
2717                    )
2718                    .await?
2719            };
2720        }
2721
2722        let (request, _, _) =
2723            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2724
2725        // The list is now fully loaded, so it requires a timeout.
2726        assert!(request.timeout.is_some());
2727
2728        Ok(())
2729    }
2730
2731    #[async_test]
2732    async fn test_timeout_three_lists() -> Result<()> {
2733        let (_server, sliding_sync) = new_sliding_sync(vec![
2734            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2735            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2736            SlidingSyncList::builder("baz")
2737                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2738        ])
2739        .await?;
2740
2741        let (request, _, _) =
2742            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744        // Two lists don't require a timeout.
2745        assert!(request.timeout.is_none());
2746
2747        // Simulate a response.
2748        {
2749            let server_response = assign!(http::Response::new("0".to_owned()), {
2750                lists: BTreeMap::from([(
2751                    "foo".to_owned(),
2752                    assign!(http::response::List::default(), {
2753                        count: uint!(7),
2754                    })
2755                 )])
2756            });
2757
2758            let _summary = {
2759                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2760                sliding_sync
2761                    .handle_response(
2762                        server_response.clone(),
2763                        &mut pos_guard,
2764                        RequestedRequiredStates::default(),
2765                    )
2766                    .await?
2767            };
2768        }
2769
2770        let (request, _, _) =
2771            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2772
2773        // One don't require a timeout.
2774        assert!(request.timeout.is_none());
2775
2776        // Simulate a response.
2777        {
2778            let server_response = assign!(http::Response::new("1".to_owned()), {
2779                lists: BTreeMap::from([(
2780                    "bar".to_owned(),
2781                    assign!(http::response::List::default(), {
2782                        count: uint!(7),
2783                    })
2784                 )])
2785            });
2786
2787            let _summary = {
2788                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2789                sliding_sync
2790                    .handle_response(
2791                        server_response.clone(),
2792                        &mut pos_guard,
2793                        RequestedRequiredStates::default(),
2794                    )
2795                    .await?
2796            };
2797        }
2798
2799        let (request, _, _) =
2800            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2801
2802        // All lists require a timeout.
2803        assert!(request.timeout.is_some());
2804
2805        Ok(())
2806    }
2807
2808    #[async_test]
2809    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2810        let server = MockServer::start().await;
2811        let client = logged_in_client(Some(server.uri())).await;
2812
2813        let _mock_guard = Mock::given(SlidingSyncMatcher)
2814            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2815                "pos": "0",
2816                "lists": {},
2817                "rooms": {}
2818            })))
2819            .mount_as_scoped(&server)
2820            .await;
2821
2822        let sliding_sync = client
2823            .sliding_sync("test")?
2824            .with_to_device_extension(
2825                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2826            )
2827            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2828            .build()
2829            .await?;
2830
2831        let sliding_sync = Arc::new(sliding_sync);
2832
2833        // Create the listener and perform a sync request
2834        let sync_beat_listener = client.inner.sync_beat.listen();
2835        sliding_sync.sync_once().await?;
2836
2837        // The sync beat listener should be notified shortly after
2838        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2839        Ok(())
2840    }
2841
2842    #[async_test]
2843    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2844        let server = MockServer::start().await;
2845        let client = logged_in_client(Some(server.uri())).await;
2846
2847        let _mock_guard = Mock::given(SlidingSyncMatcher)
2848            .respond_with(ResponseTemplate::new(404))
2849            .mount_as_scoped(&server)
2850            .await;
2851
2852        let sliding_sync = client
2853            .sliding_sync("test")?
2854            .with_to_device_extension(
2855                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2856            )
2857            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2858            .build()
2859            .await?;
2860
2861        let sliding_sync = Arc::new(sliding_sync);
2862
2863        // Create the listener and perform a sync request
2864        let sync_beat_listener = client.inner.sync_beat.listen();
2865        let sync_result = sliding_sync.sync_once().await;
2866        assert!(sync_result.is_err());
2867
2868        // The sync beat listener won't be notified in this case
2869        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2870
2871        Ok(())
2872    }
2873}