matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24mod utils;
25
26use std::{
27    collections::{btree_map::Entry, BTreeMap},
28    fmt::Debug,
29    future::Future,
30    sync::{Arc, RwLock as StdRwLock},
31    time::Duration,
32};
33
34use async_stream::stream;
35pub use client::{Version, VersionBuilder};
36use futures_core::stream::Stream;
37use matrix_sdk_base::RequestedRequiredStates;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
41    assign, OwnedRoomId, RoomId,
42};
43use serde::{Deserialize, Serialize};
44use tokio::{
45    select,
46    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
47};
48use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
49
50#[cfg(feature = "e2e-encryption")]
51use self::utils::JoinHandleExt as _;
52pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
53use self::{
54    cache::restore_sliding_sync_state,
55    client::SlidingSyncResponseProcessor,
56    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
57};
58use crate::{config::RequestConfig, Client, Result};
59
60/// The Sliding Sync instance.
61///
62/// It is OK to clone this type as much as you need: cloning it is cheap.
63#[derive(Clone, Debug)]
64pub struct SlidingSync {
65    /// The Sliding Sync data.
66    inner: Arc<SlidingSyncInner>,
67}
68
69#[derive(Debug)]
70pub(super) struct SlidingSyncInner {
71    /// A unique identifier for this instance of sliding sync.
72    ///
73    /// Used to distinguish different connections to sliding sync.
74    id: String,
75
76    /// The HTTP Matrix client.
77    client: Client,
78
79    /// Long-polling timeout that appears in sliding sync request.
80    poll_timeout: Duration,
81
82    /// Extra duration for the sliding sync request to timeout. This is added to
83    /// the [`Self::proxy_timeout`].
84    network_timeout: Duration,
85
86    /// The storage key to keep this cache at and load it from.
87    storage_key: String,
88
89    /// Should this sliding sync instance try to restore its sync position
90    /// from the database?
91    ///
92    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
93    /// keep it even so, to avoid sparkling cfg statements everywhere
94    /// throughout this file.
95    share_pos: bool,
96
97    /// Position markers.
98    ///
99    /// The `pos` marker represents a progression when exchanging requests and
100    /// responses with the server: the server acknowledges the request by
101    /// responding with a new `pos`. If the client sends two non-necessarily
102    /// consecutive requests with the same `pos`, the server has to reply with
103    /// the same identical response.
104    ///
105    /// `position` is behind a mutex so that a new request starts after the
106    /// previous request trip has fully ended (successfully or not). This
107    /// mechanism exists to wait for the response to be handled and to see the
108    /// `position` being updated, before sending a new request.
109    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
110
111    /// The lists of this Sliding Sync instance.
112    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
113
114    /// Request parameters that are sticky.
115    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
116
117    /// Internal channel used to pass messages between Sliding Sync and other
118    /// types.
119    internal_channel: Sender<SlidingSyncInternalMessage>,
120}
121
122impl SlidingSync {
123    pub(super) fn new(inner: SlidingSyncInner) -> Self {
124        Self { inner: Arc::new(inner) }
125    }
126
127    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
128        cache::store_sliding_sync_state(self, position).await
129    }
130
131    /// Create a new [`SlidingSyncBuilder`].
132    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
133        SlidingSyncBuilder::new(id, client)
134    }
135
136    /// Subscribe to many rooms.
137    ///
138    /// If the associated `Room`s exist, it will be marked as
139    /// members are missing, so that it ensures to re-fetch all members.
140    ///
141    /// A subscription to an already subscribed room is ignored.
142    pub fn subscribe_to_rooms(
143        &self,
144        room_ids: &[&RoomId],
145        settings: Option<http::request::RoomSubscription>,
146        cancel_in_flight_request: bool,
147    ) {
148        let settings = settings.unwrap_or_default();
149        let mut sticky = self.inner.sticky.write().unwrap();
150        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
151
152        let mut skip_over_current_sync_loop_iteration = false;
153
154        for room_id in room_ids {
155            // If the room subscription already exists, let's not
156            // override it with a new one. First, it would reset its
157            // state (`RoomSubscriptionState`), and second it would try to
158            // re-subscribe with the next request. We don't want that. A room
159            // subscription should happen once, and next subscriptions should
160            // be ignored.
161            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
162                if let Some(room) = self.inner.client.get_room(room_id) {
163                    room.mark_members_missing();
164                }
165
166                entry.insert((RoomSubscriptionState::default(), settings.clone()));
167
168                skip_over_current_sync_loop_iteration = true;
169            }
170        }
171
172        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
173            self.inner.internal_channel_send_if_possible(
174                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
175            );
176        }
177    }
178
179    /// Find a list by its name, and do something on it if it exists.
180    pub async fn on_list<Function, FunctionOutput, R>(
181        &self,
182        list_name: &str,
183        function: Function,
184    ) -> Option<R>
185    where
186        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
187        FunctionOutput: Future<Output = R>,
188    {
189        let lists = self.inner.lists.read().await;
190
191        match lists.get(list_name) {
192            Some(list) => Some(function(list).await),
193            None => None,
194        }
195    }
196
197    /// Add the list to the list of lists.
198    ///
199    /// As lists need to have a unique `.name`, if a list with the same name
200    /// is found the new list will replace the old one and the return it or
201    /// `None`.
202    pub async fn add_list(
203        &self,
204        list_builder: SlidingSyncListBuilder,
205    ) -> Result<Option<SlidingSyncList>> {
206        let list = list_builder.build(self.inner.internal_channel.clone());
207
208        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
209
210        self.inner.internal_channel_send_if_possible(
211            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
212        );
213
214        Ok(old_list)
215    }
216
217    /// Add a list that will be cached and reloaded from the cache.
218    ///
219    /// This will raise an error if a storage key was not set, or if there
220    /// was a I/O error reading from the cache.
221    ///
222    /// The rest of the semantics is the same as [`Self::add_list`].
223    pub async fn add_cached_list(
224        &self,
225        mut list_builder: SlidingSyncListBuilder,
226    ) -> Result<Option<SlidingSyncList>> {
227        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
228
229        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
230
231        self.add_list(list_builder).await
232    }
233
234    /// Handle the HTTP response.
235    #[instrument(skip_all)]
236    async fn handle_response(
237        &self,
238        sliding_sync_response: http::Response,
239        position: &mut SlidingSyncPositionMarkers,
240        requested_required_states: RequestedRequiredStates,
241    ) -> Result<UpdateSummary, crate::Error> {
242        let pos = Some(sliding_sync_response.pos.clone());
243
244        let must_process_rooms_response = self.must_process_rooms_response().await;
245
246        trace!(yes = must_process_rooms_response, "Must process rooms response?");
247
248        // Transform a Sliding Sync Response to a `SyncResponse`.
249        //
250        // We may not need the `sync_response` in the future (once `SyncResponse` will
251        // move to Sliding Sync, i.e. to `http::Response`), but processing the
252        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
253        // happens here.
254
255        let sync_response = {
256            let response_processor = {
257                // Take the lock to avoid concurrent sliding syncs overwriting each other's room
258                // infos.
259                let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
260
261                let mut response_processor =
262                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
263
264                #[cfg(feature = "e2e-encryption")]
265                if self.is_e2ee_enabled() {
266                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
267                }
268
269                // Only handle the room's subsection of the response, if this sliding sync was
270                // configured to do so.
271                if must_process_rooms_response {
272                    response_processor
273                        .handle_room_response(&sliding_sync_response, &requested_required_states)
274                        .await?;
275                }
276
277                response_processor
278            };
279
280            // Release the lock before calling event handlers
281            response_processor.process_and_take_response().await?
282        };
283
284        debug!(?sync_response, "Sliding Sync response has been handled by the client");
285
286        // Commit sticky parameters, if needed.
287        if let Some(ref txn_id) = sliding_sync_response.txn_id {
288            let txn_id = txn_id.as_str().into();
289            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
290            let mut lists = self.inner.lists.write().await;
291            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
292        }
293
294        let update_summary = {
295            // Update the rooms.
296            let updated_rooms = {
297                let mut updated_rooms = Vec::with_capacity(
298                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
299                );
300
301                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
302
303                // There might be other rooms that were only mentioned in the sliding sync
304                // extensions part of the response, and thus would result in rooms present in
305                // the `sync_response.joined`. Mark them as updated too.
306                //
307                // Since we've removed rooms that were in the room subsection from
308                // `sync_response.rooms.joined`, the remaining ones aren't already present in
309                // `updated_rooms` and wouldn't cause any duplicates.
310                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
311
312                updated_rooms
313            };
314
315            // Update the lists.
316            let updated_lists = {
317                debug!(
318                    lists = ?sliding_sync_response.lists,
319                    "Update lists"
320                );
321
322                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
323                let mut lists = self.inner.lists.write().await;
324
325                // Iterate on known lists, not on lists in the response. Rooms may have been
326                // updated that were not involved in any list update.
327                for (name, list) in lists.iter_mut() {
328                    if let Some(updates) = sliding_sync_response.lists.get(name) {
329                        let maximum_number_of_rooms: u32 =
330                            updates.count.try_into().expect("failed to convert `count` to `u32`");
331
332                        if list.update(Some(maximum_number_of_rooms))? {
333                            updated_lists.push(name.clone());
334                        }
335                    } else if list.update(None)? {
336                        updated_lists.push(name.clone());
337                    }
338                }
339
340                // Report about unknown lists.
341                for name in sliding_sync_response.lists.keys() {
342                    if !lists.contains_key(name) {
343                        error!("Response for list `{name}` - unknown to us; skipping");
344                    }
345                }
346
347                updated_lists
348            };
349
350            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
351        };
352
353        // Everything went well, we can update the position markers.
354        //
355        // Save the new position markers.
356        position.pos = pos;
357
358        Ok(update_summary)
359    }
360
361    async fn generate_sync_request(
362        &self,
363        txn_id: &mut LazyTransactionId,
364    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
365        // Collect requests for lists.
366        let mut requests_lists = BTreeMap::new();
367
368        let require_timeout = {
369            let lists = self.inner.lists.read().await;
370
371            // Start at `true` in case there is zero list.
372            let mut require_timeout = true;
373
374            for (name, list) in lists.iter() {
375                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
376                require_timeout = require_timeout && list.requires_timeout();
377            }
378
379            require_timeout
380        };
381
382        // Collect the `pos`.
383        //
384        // Wait on the `position` mutex to be available. It means no request nor
385        // response is running. The `position` mutex is released whether the response
386        // has been fully handled successfully, in this case the `pos` is updated, or
387        // the response handling has failed, in this case the `pos` hasn't been updated
388        // and the same `pos` will be used for this new request.
389        let mut position_guard = self.inner.position.clone().lock_owned().await;
390
391        let to_device_enabled =
392            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
393
394        let restored_fields = if self.inner.share_pos || to_device_enabled {
395            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
396        } else {
397            None
398        };
399
400        // Update pos: either the one restored from the database, if any and the sliding
401        // sync was configured so, or read it from the memory cache.
402        let pos = if self.inner.share_pos {
403            if let Some(fields) = &restored_fields {
404                // Override the memory one with the database one, for consistency.
405                if fields.pos != position_guard.pos {
406                    info!(
407                        "Pos from previous request ('{:?}') was different from \
408                         pos in database ('{:?}').",
409                        position_guard.pos, fields.pos
410                    );
411                    position_guard.pos = fields.pos.clone();
412                }
413                fields.pos.clone()
414            } else {
415                position_guard.pos.clone()
416            }
417        } else {
418            position_guard.pos.clone()
419        };
420
421        Span::current().record("pos", &pos);
422
423        // When the client sends a request with no `pos`, MSC4186 returns no device
424        // lists updates, as it only returns changes since the provided `pos`
425        // (which is `null` in this case); this is in line with sync v2.
426        //
427        // Therefore, with MSC4186, the device list cache must be marked as to be
428        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
429        // device lists updates that happened between the previous request and the new
430        // “initial” request.
431        #[cfg(feature = "e2e-encryption")]
432        if pos.is_none() && self.is_e2ee_enabled() {
433            info!("Marking all tracked users as dirty");
434
435            let olm_machine = self.inner.client.olm_machine().await;
436            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
437            olm_machine.mark_all_tracked_users_as_dirty().await?;
438        }
439
440        // Configure the timeout.
441        //
442        // The `timeout` query is necessary when all lists require it. Please see
443        // [`SlidingSyncList::requires_timeout`].
444        let timeout = require_timeout.then(|| self.inner.poll_timeout);
445
446        let mut request = assign!(http::Request::new(), {
447            conn_id: Some(self.inner.id.clone()),
448            pos,
449            timeout,
450            lists: requests_lists,
451        });
452
453        // Apply sticky parameters, if needs be.
454        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
455
456        // Extensions are now applied (via sticky parameters).
457        //
458        // Override the to-device token if the extension is enabled.
459        if to_device_enabled {
460            request.extensions.to_device.since =
461                restored_fields.and_then(|fields| fields.to_device_token);
462        }
463
464        // Apply the transaction id if one was generated.
465        if let Some(txn_id) = txn_id.get() {
466            request.txn_id = Some(txn_id.to_string());
467        }
468
469        Ok((
470            // The request itself.
471            request,
472            // Configure long-polling. We need some time for the long-poll itself,
473            // and extra time for the network delays.
474            RequestConfig::default()
475                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
476                .retry_limit(3),
477            position_guard,
478        ))
479    }
480
481    /// Send a sliding sync request.
482    ///
483    /// This method contains the sending logic.
484    async fn send_sync_request(
485        &self,
486        request: http::Request,
487        request_config: RequestConfig,
488        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
489    ) -> Result<UpdateSummary> {
490        debug!("Sending request");
491
492        // Prepare the request.
493        let requested_required_states = RequestedRequiredStates::from(&request);
494        let request = self.inner.client.send(request).with_request_config(request_config);
495
496        // Send the request and get a response with end-to-end encryption support.
497        //
498        // Sending the `/sync` request out when end-to-end encryption is enabled means
499        // that we need to also send out any outgoing e2ee related request out
500        // coming from the `OlmMachine::outgoing_requests()` method.
501
502        #[cfg(feature = "e2e-encryption")]
503        let response = {
504            if self.is_e2ee_enabled() {
505                // Here, we need to run 2 things:
506                //
507                // 1. Send the sliding sync request and get a response,
508                // 2. Send the E2EE requests.
509                //
510                // We don't want to use a `join` or `try_join` because we want to fail if and
511                // only if sending the sliding sync request fails. Failing to send the E2EE
512                // requests should just result in a log.
513                //
514                // We also want to give the priority to sliding sync request. E2EE requests are
515                // sent concurrently to the sliding sync request, but the priority is on waiting
516                // a sliding sync response.
517                //
518                // If sending sliding sync request fails, the sending of E2EE requests must be
519                // aborted as soon as possible.
520
521                let client = self.inner.client.clone();
522                let e2ee_uploads = spawn(
523                    async move {
524                        if let Err(error) = client.send_outgoing_requests().await {
525                            error!(?error, "Error while sending outgoing E2EE requests");
526                        }
527                    }
528                    .instrument(Span::current()),
529                )
530                // Ensure that the task is not running in detached mode. It is aborted when it's
531                // dropped.
532                .abort_on_drop();
533
534                // Wait on the sliding sync request success or failure early.
535                let response = request.await?;
536
537                // At this point, if `request` has been resolved successfully, we wait on
538                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
539                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
540                // been dropped, so aborted.
541                e2ee_uploads.await.map_err(|error| Error::JoinError {
542                    task_description: "e2ee_uploads".to_owned(),
543                    error,
544                })?;
545
546                response
547            } else {
548                request.await?
549            }
550        };
551
552        // Send the request and get a response _without_ end-to-end encryption support.
553        #[cfg(not(feature = "e2e-encryption"))]
554        let response = request.await?;
555
556        debug!("Received response");
557
558        // At this point, the request has been sent, and a response has been received.
559        //
560        // We must ensure the handling of the response cannot be stopped/
561        // cancelled. It must be done entirely, otherwise we can have
562        // corrupted/incomplete states for Sliding Sync and other parts of
563        // the code.
564        //
565        // That's why we are running the handling of the response in a spawned
566        // future that cannot be cancelled by anything.
567        let this = self.clone();
568
569        // Spawn a new future to ensure that the code inside this future cannot be
570        // cancelled if this method is cancelled.
571        let future = async move {
572            debug!("Start handling response");
573
574            // In case the task running this future is detached, we must
575            // ensure responses are handled one at a time. At this point we still own
576            // `position_guard`, so we're fine.
577
578            // Handle the response.
579            let updates = this
580                .handle_response(response, &mut position_guard, requested_required_states)
581                .await?;
582
583            this.cache_to_storage(&position_guard).await?;
584
585            // Release the position guard lock.
586            // It means that other responses can be generated and then handled later.
587            drop(position_guard);
588
589            debug!("Done handling response");
590
591            Ok(updates)
592        };
593
594        spawn(future.instrument(Span::current())).await.unwrap()
595    }
596
597    /// Is the e2ee extension enabled for this sliding sync instance?
598    #[cfg(feature = "e2e-encryption")]
599    fn is_e2ee_enabled(&self) -> bool {
600        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
601    }
602
603    #[cfg(not(feature = "e2e-encryption"))]
604    fn is_e2ee_enabled(&self) -> bool {
605        false
606    }
607
608    /// Should we process the room's subpart of a response?
609    async fn must_process_rooms_response(&self) -> bool {
610        // We consider that we must, if there's any room subscription or there's any
611        // list.
612        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
613            || !self.inner.lists.read().await.is_empty()
614    }
615
616    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
617    async fn sync_once(&self) -> Result<UpdateSummary> {
618        let (request, request_config, position_guard) =
619            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
620
621        // Send the request, kaboom.
622        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
623
624        // Notify a new sync was received
625        self.inner.client.inner.sync_beat.notify(usize::MAX);
626
627        Ok(summaries)
628    }
629
630    /// Create a _new_ Sliding Sync sync loop.
631    ///
632    /// This method returns a `Stream`, which will send requests and will handle
633    /// responses automatically. Lists and rooms are updated automatically.
634    ///
635    /// This function returns `Ok(…)` if everything went well, otherwise it will
636    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
637    /// termination.
638    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
639    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
640    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
641        debug!("Starting sync stream");
642
643        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
644
645        stream! {
646            loop {
647                debug!("Sync stream is running");
648
649                select! {
650                    biased;
651
652                    internal_message = internal_channel_receiver.recv() => {
653                        use SlidingSyncInternalMessage::*;
654
655                        debug!(?internal_message, "Sync stream has received an internal message");
656
657                        match internal_message {
658                            Err(_) | Ok(SyncLoopStop) => {
659                                break;
660                            }
661
662                            Ok(SyncLoopSkipOverCurrentIteration) => {
663                                continue;
664                            }
665                        }
666                    }
667
668                    update_summary = self.sync_once() => {
669                        match update_summary {
670                            Ok(updates) => {
671                                yield Ok(updates);
672                            }
673
674                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
675                            Err(error) => {
676                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
677                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
678                                    self.expire_session().await;
679                                }
680
681                                yield Err(error);
682
683                                // Terminates the loop, and terminates the stream.
684                                break;
685                            }
686                        }
687                    }
688                }
689            }
690
691            debug!("Sync stream has exited.");
692        }
693    }
694
695    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
696    ///
697    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
698    /// enough to “stop” it, but depending of how this `Stream` is used, it
699    /// might not be obvious to drop it immediately (thinking of using this API
700    /// over FFI; the foreign-language might not be able to drop a value
701    /// immediately). Thus, calling this method will ensure that the sync loop
702    /// stops gracefully and as soon as it returns.
703    pub fn stop_sync(&self) -> Result<()> {
704        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
705    }
706
707    /// Expire the current Sliding Sync session on the client-side.
708    ///
709    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
710    /// sticky parameters.
711    ///
712    /// This should only be used when it's clear that this session was about to
713    /// expire anyways, and should be used only in very specific cases (e.g.
714    /// multiple sliding syncs being run in parallel, and one of them has
715    /// expired).
716    ///
717    /// This method **MUST** be called when the sync loop is stopped.
718    #[doc(hidden)]
719    pub async fn expire_session(&self) {
720        info!("Session expired; resetting `pos` and sticky parameters");
721
722        {
723            let mut position = self.inner.position.lock().await;
724            position.pos = None;
725
726            if let Err(err) = self.cache_to_storage(&position).await {
727                error!(
728                    "couldn't invalidate sliding sync frozen state when expiring session: {err}"
729                );
730            }
731        }
732
733        {
734            let mut sticky = self.inner.sticky.write().unwrap();
735
736            // Clear all room subscriptions: we don't want to resend all room subscriptions
737            // when the session will restart.
738            sticky.data_mut().room_subscriptions.clear();
739        }
740
741        self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
742    }
743}
744
745impl SlidingSyncInner {
746    /// Send a message over the internal channel.
747    #[instrument]
748    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
749        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
750    }
751
752    /// Send a message over the internal channel if there is a receiver, i.e. if
753    /// the sync loop is running.
754    #[instrument]
755    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
756        // If there is no receiver, the send will fail, but that's OK here.
757        let _ = self.internal_channel.send(message);
758    }
759}
760
761#[derive(Copy, Clone, Debug, PartialEq)]
762enum SlidingSyncInternalMessage {
763    /// Instruct the sync loop to stop.
764    SyncLoopStop,
765
766    /// Instruct the sync loop to skip over any remaining work in its iteration,
767    /// and to jump to the next iteration.
768    SyncLoopSkipOverCurrentIteration,
769}
770
771#[cfg(any(test, feature = "testing"))]
772impl SlidingSync {
773    /// Set a new value for `pos`.
774    pub async fn set_pos(&self, new_pos: String) {
775        let mut position_lock = self.inner.position.lock().await;
776        position_lock.pos = Some(new_pos);
777    }
778
779    /// Read the static extension configuration for this Sliding Sync.
780    ///
781    /// Note: this is not the next content of the sticky parameters, but rightly
782    /// the static configuration that was set during creation of this
783    /// Sliding Sync.
784    pub fn extensions_config(&self) -> http::request::Extensions {
785        let sticky = self.inner.sticky.read().unwrap();
786        sticky.data().extensions.clone()
787    }
788}
789
790#[derive(Clone, Debug)]
791pub(super) struct SlidingSyncPositionMarkers {
792    /// An ephemeral position in the current stream, as received from the
793    /// previous `/sync` response, or `None` for the first request.
794    ///
795    /// Should not be persisted.
796    pos: Option<String>,
797}
798
799#[derive(Serialize, Deserialize)]
800struct FrozenSlidingSyncPos {
801    #[serde(skip_serializing_if = "Option::is_none")]
802    pos: Option<String>,
803}
804
805/// A summary of the updates received after a sync (like in
806/// [`SlidingSync::sync`]).
807#[derive(Debug, Clone)]
808pub struct UpdateSummary {
809    /// The names of the lists that have seen an update.
810    pub lists: Vec<String>,
811    /// The rooms that have seen updates
812    pub rooms: Vec<OwnedRoomId>,
813}
814
815/// A very basic bool-ish enum to represent the state of a
816/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
817/// once should ideally not being sent again, to mostly save bandwidth.
818#[derive(Debug, Default)]
819enum RoomSubscriptionState {
820    /// The `RoomSubscription` has not been sent or received correctly from the
821    /// server, i.e. the `RoomSubscription` —which is part of the sticky
822    /// parameters— has not been committed.
823    #[default]
824    Pending,
825
826    /// The `RoomSubscription` has been sent and received correctly by the
827    /// server.
828    Applied,
829}
830
831/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
832/// sent in the request.
833#[derive(Debug)]
834pub(super) struct SlidingSyncStickyParameters {
835    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
836    /// but one wants to receive updates.
837    room_subscriptions:
838        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
839
840    /// The intended state of the extensions being supplied to sliding /sync
841    /// calls.
842    extensions: http::request::Extensions,
843}
844
845impl SlidingSyncStickyParameters {
846    /// Create a new set of sticky parameters.
847    pub fn new(
848        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
849        extensions: http::request::Extensions,
850    ) -> Self {
851        Self {
852            room_subscriptions: room_subscriptions
853                .into_iter()
854                .map(|(room_id, room_subscription)| {
855                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
856                })
857                .collect(),
858            extensions,
859        }
860    }
861}
862
863impl StickyData for SlidingSyncStickyParameters {
864    type Request = http::Request;
865
866    fn apply(&self, request: &mut Self::Request) {
867        request.room_subscriptions = self
868            .room_subscriptions
869            .iter()
870            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
871            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
872            .collect();
873        request.extensions = self.extensions.clone();
874    }
875
876    fn on_commit(&mut self) {
877        // All room subscriptions are marked as `Applied`.
878        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
879            if matches!(state, RoomSubscriptionState::Pending) {
880                *state = RoomSubscriptionState::Applied;
881            }
882        }
883    }
884}
885
886#[cfg(all(test, not(target_family = "wasm")))]
887#[allow(clippy::dbg_macro)]
888mod tests {
889    use std::{
890        collections::BTreeMap,
891        future::ready,
892        ops::Not,
893        sync::{Arc, Mutex},
894        time::Duration,
895    };
896
897    use assert_matches::assert_matches;
898    use event_listener::Listener;
899    use futures_util::{future::join_all, pin_mut, StreamExt};
900    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
901    use matrix_sdk_common::executor::spawn;
902    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
903    use ruma::{
904        api::client::error::ErrorKind,
905        assign,
906        events::{direct::DirectEvent, room::member::MembershipState},
907        owned_room_id, room_id,
908        serde::Raw,
909        uint, OwnedRoomId, TransactionId,
910    };
911    use serde::Deserialize;
912    use serde_json::json;
913    use wiremock::{
914        http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
915    };
916
917    use super::{
918        http,
919        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
920        SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
921        SlidingSyncStickyParameters,
922    };
923    use crate::{
924        sliding_sync::cache::restore_sliding_sync_state,
925        test_utils::{logged_in_client, mocks::MatrixMockServer},
926        Client, Result,
927    };
928
929    #[derive(Copy, Clone)]
930    struct SlidingSyncMatcher;
931
932    impl Match for SlidingSyncMatcher {
933        fn matches(&self, request: &Request) -> bool {
934            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
935                && request.method == Method::POST
936        }
937    }
938
939    async fn new_sliding_sync(
940        lists: Vec<SlidingSyncListBuilder>,
941    ) -> Result<(MockServer, SlidingSync)> {
942        let server = MockServer::start().await;
943        let client = logged_in_client(Some(server.uri())).await;
944
945        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
946
947        for list in lists {
948            sliding_sync_builder = sliding_sync_builder.add_list(list);
949        }
950
951        let sliding_sync = sliding_sync_builder.build().await?;
952
953        Ok((server, sliding_sync))
954    }
955
956    #[async_test]
957    async fn test_subscribe_to_rooms() -> Result<()> {
958        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
959            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
960        .await?;
961
962        let stream = sliding_sync.sync();
963        pin_mut!(stream);
964
965        let room_id_0 = room_id!("!r0:bar.org");
966        let room_id_1 = room_id!("!r1:bar.org");
967        let room_id_2 = room_id!("!r2:bar.org");
968
969        {
970            let _mock_guard = Mock::given(SlidingSyncMatcher)
971                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
972                    "pos": "1",
973                    "lists": {},
974                    "rooms": {
975                        room_id_0: {
976                            "name": "Room #0",
977                            "initial": true,
978                        },
979                        room_id_1: {
980                            "name": "Room #1",
981                            "initial": true,
982                        },
983                        room_id_2: {
984                            "name": "Room #2",
985                            "initial": true,
986                        },
987                    }
988                })))
989                .mount_as_scoped(&server)
990                .await;
991
992            let _ = stream.next().await.unwrap()?;
993        }
994
995        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
996
997        // Members aren't synced.
998        // We need to make them synced, so that we can test that subscribing to a room
999        // make members not synced. That's a desired feature.
1000        assert!(room0.are_members_synced().not());
1001
1002        {
1003            struct MemberMatcher(OwnedRoomId);
1004
1005            impl Match for MemberMatcher {
1006                fn matches(&self, request: &Request) -> bool {
1007                    request.url.path()
1008                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1009                        && request.method == Method::GET
1010                }
1011            }
1012
1013            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1014                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1015                    "chunk": [],
1016                })))
1017                .mount_as_scoped(&server)
1018                .await;
1019
1020            assert_matches!(room0.request_members().await, Ok(()));
1021        }
1022
1023        // Members are now synced! We can start subscribing and see how it goes.
1024        assert!(room0.are_members_synced());
1025
1026        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1027
1028        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1029        // now marked as not synced.
1030        assert!(room0.are_members_synced().not());
1031
1032        {
1033            let sticky = sliding_sync.inner.sticky.read().unwrap();
1034            let room_subscriptions = &sticky.data().room_subscriptions;
1035
1036            assert!(room_subscriptions.contains_key(room_id_0));
1037            assert!(room_subscriptions.contains_key(room_id_1));
1038            assert!(!room_subscriptions.contains_key(room_id_2));
1039        }
1040
1041        // Subscribing to the same room doesn't reset the member sync state.
1042
1043        {
1044            struct MemberMatcher(OwnedRoomId);
1045
1046            impl Match for MemberMatcher {
1047                fn matches(&self, request: &Request) -> bool {
1048                    request.url.path()
1049                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1050                        && request.method == Method::GET
1051                }
1052            }
1053
1054            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1055                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1056                    "chunk": [],
1057                })))
1058                .mount_as_scoped(&server)
1059                .await;
1060
1061            assert_matches!(room0.request_members().await, Ok(()));
1062        }
1063
1064        // Members are synced, good, good.
1065        assert!(room0.are_members_synced());
1066
1067        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1068
1069        // Members are still synced: because we have already subscribed to the
1070        // room, the members aren't marked as unsynced.
1071        assert!(room0.are_members_synced());
1072
1073        Ok(())
1074    }
1075
1076    #[async_test]
1077    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1078        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1079            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1080        .await?;
1081
1082        let room_id_0 = room_id!("!r0:bar.org");
1083        let room_id_1 = room_id!("!r1:bar.org");
1084        let room_id_2 = room_id!("!r2:bar.org");
1085
1086        // Subscribe to two rooms.
1087        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1088
1089        {
1090            let sticky = sliding_sync.inner.sticky.read().unwrap();
1091            let room_subscriptions = &sticky.data().room_subscriptions;
1092
1093            assert!(room_subscriptions.contains_key(room_id_0));
1094            assert!(room_subscriptions.contains_key(room_id_1));
1095            assert!(room_subscriptions.contains_key(room_id_2).not());
1096        }
1097
1098        // Subscribe to one more room.
1099        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1100
1101        {
1102            let sticky = sliding_sync.inner.sticky.read().unwrap();
1103            let room_subscriptions = &sticky.data().room_subscriptions;
1104
1105            assert!(room_subscriptions.contains_key(room_id_0));
1106            assert!(room_subscriptions.contains_key(room_id_1));
1107            assert!(room_subscriptions.contains_key(room_id_2));
1108        }
1109
1110        // Suddenly, the session expires!
1111        sliding_sync.expire_session().await;
1112
1113        {
1114            let sticky = sliding_sync.inner.sticky.read().unwrap();
1115            let room_subscriptions = &sticky.data().room_subscriptions;
1116
1117            assert!(room_subscriptions.is_empty());
1118        }
1119
1120        // Subscribe to one room again.
1121        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1122
1123        {
1124            let sticky = sliding_sync.inner.sticky.read().unwrap();
1125            let room_subscriptions = &sticky.data().room_subscriptions;
1126
1127            assert!(room_subscriptions.contains_key(room_id_0).not());
1128            assert!(room_subscriptions.contains_key(room_id_1).not());
1129            assert!(room_subscriptions.contains_key(room_id_2));
1130        }
1131
1132        Ok(())
1133    }
1134
1135    #[async_test]
1136    async fn test_add_list() -> Result<()> {
1137        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1138            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1139        .await?;
1140
1141        let _stream = sliding_sync.sync();
1142        pin_mut!(_stream);
1143
1144        sliding_sync
1145            .add_list(
1146                SlidingSyncList::builder("bar")
1147                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1148            )
1149            .await?;
1150
1151        let lists = sliding_sync.inner.lists.read().await;
1152
1153        assert!(lists.contains_key("foo"));
1154        assert!(lists.contains_key("bar"));
1155
1156        // this test also ensures that Tokio is not panicking when calling `add_list`.
1157
1158        Ok(())
1159    }
1160
1161    #[test]
1162    fn test_sticky_parameters_api_invalidated_flow() {
1163        let r0 = room_id!("!r0.matrix.org");
1164        let r1 = room_id!("!r1:matrix.org");
1165
1166        let mut room_subscriptions = BTreeMap::new();
1167        room_subscriptions.insert(r0.to_owned(), Default::default());
1168
1169        // At first it's invalidated.
1170        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1171            room_subscriptions,
1172            Default::default(),
1173        ));
1174        assert!(sticky.is_invalidated());
1175
1176        // Then when we create a request, the sticky parameters are applied.
1177        let txn_id: &TransactionId = "tid123".into();
1178
1179        let mut request = http::Request::default();
1180        request.txn_id = Some(txn_id.to_string());
1181
1182        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1183
1184        assert!(request.txn_id.is_some());
1185        assert_eq!(request.room_subscriptions.len(), 1);
1186        assert!(request.room_subscriptions.contains_key(r0));
1187
1188        let tid = request.txn_id.unwrap();
1189
1190        sticky.maybe_commit(tid.as_str().into());
1191        assert!(!sticky.is_invalidated());
1192
1193        // Applying new parameters will invalidate again.
1194        sticky
1195            .data_mut()
1196            .room_subscriptions
1197            .insert(r1.to_owned(), (Default::default(), Default::default()));
1198        assert!(sticky.is_invalidated());
1199
1200        // Committing with the wrong transaction id will keep it invalidated.
1201        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1202        assert!(sticky.is_invalidated());
1203
1204        // Restarting a request will only remember the last generated transaction id.
1205        let txn_id1: &TransactionId = "tid456".into();
1206        let mut request1 = http::Request::default();
1207        request1.txn_id = Some(txn_id1.to_string());
1208        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1209
1210        assert!(sticky.is_invalidated());
1211        // The first room subscription has been applied to `request`, so it's not
1212        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1213        // related to the sticky design.
1214        assert_eq!(request1.room_subscriptions.len(), 1);
1215        assert!(request1.room_subscriptions.contains_key(r1));
1216
1217        let txn_id2: &TransactionId = "tid789".into();
1218        let mut request2 = http::Request::default();
1219        request2.txn_id = Some(txn_id2.to_string());
1220
1221        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1222        assert!(sticky.is_invalidated());
1223        // `request2` contains `r1` because the sticky parameters have not been
1224        // committed, so it's still marked as pending.
1225        assert_eq!(request2.room_subscriptions.len(), 1);
1226        assert!(request2.room_subscriptions.contains_key(r1));
1227
1228        // Here we commit with the not most-recent TID, so it keeps the invalidated
1229        // status.
1230        sticky.maybe_commit(txn_id1);
1231        assert!(sticky.is_invalidated());
1232
1233        // But here we use the latest TID, so the commit is effective.
1234        sticky.maybe_commit(txn_id2);
1235        assert!(!sticky.is_invalidated());
1236    }
1237
1238    #[test]
1239    fn test_room_subscriptions_are_sticky() {
1240        let r0 = room_id!("!r0.matrix.org");
1241        let r1 = room_id!("!r1:matrix.org");
1242
1243        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1244            BTreeMap::new(),
1245            Default::default(),
1246        ));
1247
1248        // A room subscription is added, applied, and committed.
1249        {
1250            // Insert `r0`.
1251            sticky
1252                .data_mut()
1253                .room_subscriptions
1254                .insert(r0.to_owned(), (Default::default(), Default::default()));
1255
1256            // Then the sticky parameters are applied.
1257            let txn_id: &TransactionId = "tid0".into();
1258            let mut request = http::Request::default();
1259            request.txn_id = Some(txn_id.to_string());
1260
1261            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1262
1263            assert!(request.txn_id.is_some());
1264            assert_eq!(request.room_subscriptions.len(), 1);
1265            assert!(request.room_subscriptions.contains_key(r0));
1266
1267            // Then the sticky parameters are committed.
1268            let tid = request.txn_id.unwrap();
1269
1270            sticky.maybe_commit(tid.as_str().into());
1271        }
1272
1273        // A room subscription is added, applied, but NOT committed.
1274        {
1275            // Insert `r1`.
1276            sticky
1277                .data_mut()
1278                .room_subscriptions
1279                .insert(r1.to_owned(), (Default::default(), Default::default()));
1280
1281            // Then the sticky parameters are applied.
1282            let txn_id: &TransactionId = "tid1".into();
1283            let mut request = http::Request::default();
1284            request.txn_id = Some(txn_id.to_string());
1285
1286            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1287
1288            assert!(request.txn_id.is_some());
1289            assert_eq!(request.room_subscriptions.len(), 1);
1290            // `r0` is not present, it's only `r1`.
1291            assert!(request.room_subscriptions.contains_key(r1));
1292
1293            // Then the sticky parameters are NOT committed.
1294            // It can happen if the request has failed to be sent for example,
1295            // or if the response didn't match.
1296        }
1297
1298        // A previously added room subscription is re-added, applied, and committed.
1299        {
1300            // Then the sticky parameters are applied.
1301            let txn_id: &TransactionId = "tid2".into();
1302            let mut request = http::Request::default();
1303            request.txn_id = Some(txn_id.to_string());
1304
1305            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1306
1307            assert!(request.txn_id.is_some());
1308            assert_eq!(request.room_subscriptions.len(), 1);
1309            // `r0` is not present, it's only `r1`.
1310            assert!(request.room_subscriptions.contains_key(r1));
1311
1312            // Then the sticky parameters are committed.
1313            let tid = request.txn_id.unwrap();
1314
1315            sticky.maybe_commit(tid.as_str().into());
1316        }
1317
1318        // All room subscriptions have been committed.
1319        {
1320            // Then the sticky parameters are applied.
1321            let txn_id: &TransactionId = "tid3".into();
1322            let mut request = http::Request::default();
1323            request.txn_id = Some(txn_id.to_string());
1324
1325            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1326
1327            assert!(request.txn_id.is_some());
1328            // All room subscriptions have been sent.
1329            assert!(request.room_subscriptions.is_empty());
1330        }
1331    }
1332
1333    #[test]
1334    fn test_extensions_are_sticky() {
1335        let mut extensions = http::request::Extensions::default();
1336        extensions.account_data.enabled = Some(true);
1337
1338        // At first it's invalidated.
1339        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1340            Default::default(),
1341            extensions,
1342        ));
1343
1344        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1345
1346        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1347        // to-device.
1348        let extensions = &sticky.data().extensions;
1349        assert_eq!(extensions.e2ee.enabled, None);
1350        assert_eq!(extensions.to_device.enabled, None);
1351        assert_eq!(extensions.to_device.since, None);
1352
1353        // What the user explicitly enabled is… enabled.
1354        assert_eq!(extensions.account_data.enabled, Some(true));
1355
1356        let txn_id: &TransactionId = "tid123".into();
1357        let mut request = http::Request::default();
1358        request.txn_id = Some(txn_id.to_string());
1359        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1360        assert!(sticky.is_invalidated());
1361        assert_eq!(request.extensions.to_device.enabled, None);
1362        assert_eq!(request.extensions.to_device.since, None);
1363        assert_eq!(request.extensions.e2ee.enabled, None);
1364        assert_eq!(request.extensions.account_data.enabled, Some(true));
1365    }
1366
1367    #[async_test]
1368    async fn test_sticky_extensions_plus_since() -> Result<()> {
1369        let server = MockServer::start().await;
1370        let client = logged_in_client(Some(server.uri())).await;
1371
1372        let sync = client
1373            .sliding_sync("test-slidingsync")?
1374            .add_list(SlidingSyncList::builder("new_list"))
1375            .build()
1376            .await?;
1377
1378        // No extensions have been explicitly enabled here.
1379        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1380        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1381        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1382
1383        // Now enable e2ee and to-device.
1384        let sync = client
1385            .sliding_sync("test-slidingsync")?
1386            .add_list(SlidingSyncList::builder("new_list"))
1387            .with_to_device_extension(
1388                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1389            )
1390            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1391            .build()
1392            .await?;
1393
1394        // Even without a since token, the first request will contain the extensions
1395        // configuration, at least.
1396        let txn_id = TransactionId::new();
1397        let (request, _, _) = sync
1398            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1399            .await?;
1400
1401        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1402        assert_eq!(request.extensions.to_device.enabled, Some(true));
1403        assert!(request.extensions.to_device.since.is_none());
1404
1405        {
1406            // Committing with another transaction id doesn't validate anything.
1407            let mut sticky = sync.inner.sticky.write().unwrap();
1408            assert!(sticky.is_invalidated());
1409            sticky.maybe_commit(
1410                "hopefully the rng won't generate this very specific transaction id".into(),
1411            );
1412            assert!(sticky.is_invalidated());
1413        }
1414
1415        // Regenerating a request will yield the same one.
1416        let txn_id2 = TransactionId::new();
1417        let (request, _, _) = sync
1418            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1419            .await?;
1420
1421        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1422        assert_eq!(request.extensions.to_device.enabled, Some(true));
1423        assert!(request.extensions.to_device.since.is_none());
1424
1425        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1426
1427        {
1428            // Committing with the expected transaction id will validate it.
1429            let mut sticky = sync.inner.sticky.write().unwrap();
1430            assert!(sticky.is_invalidated());
1431            sticky.maybe_commit(txn_id2.as_str().into());
1432            assert!(!sticky.is_invalidated());
1433        }
1434
1435        // The next request should contain no sticky parameters.
1436        let txn_id = TransactionId::new();
1437        let (request, _, _) = sync
1438            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1439            .await?;
1440        assert!(request.extensions.e2ee.enabled.is_none());
1441        assert!(request.extensions.to_device.enabled.is_none());
1442        assert!(request.extensions.to_device.since.is_none());
1443
1444        // If there's a to-device `since` token, we make sure we put the token
1445        // into the extension config. The rest doesn't need to be re-enabled due to
1446        // stickiness.
1447        let _since_token = "since";
1448
1449        #[cfg(feature = "e2e-encryption")]
1450        {
1451            use matrix_sdk_base::crypto::store::Changes;
1452            if let Some(olm_machine) = &*client.olm_machine().await {
1453                olm_machine
1454                    .store()
1455                    .save_changes(Changes {
1456                        next_batch_token: Some(_since_token.to_owned()),
1457                        ..Default::default()
1458                    })
1459                    .await?;
1460            }
1461        }
1462
1463        let txn_id = TransactionId::new();
1464        let (request, _, _) = sync
1465            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1466            .await?;
1467
1468        assert!(request.extensions.e2ee.enabled.is_none());
1469        assert!(request.extensions.to_device.enabled.is_none());
1470
1471        #[cfg(feature = "e2e-encryption")]
1472        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1473
1474        Ok(())
1475    }
1476
1477    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1478    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1479    // `/key/query` requests must be sent. See the code to see the details.
1480    //
1481    // This test is asserting that.
1482    #[async_test]
1483    #[cfg(feature = "e2e-encryption")]
1484    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1485        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1486        use matrix_sdk_test::ruma_response_from_json;
1487        use ruma::user_id;
1488
1489        let server = MockServer::start().await;
1490        let client = logged_in_client(Some(server.uri())).await;
1491
1492        let alice = user_id!("@alice:localhost");
1493        let bob = user_id!("@bob:localhost");
1494        let me = user_id!("@example:localhost");
1495
1496        // Track and mark users are not dirty, so that we can check they are “dirty”
1497        // after that. Dirty here means that a `/key/query` must be sent.
1498        {
1499            let olm_machine = client.olm_machine().await;
1500            let olm_machine = olm_machine.as_ref().unwrap();
1501
1502            olm_machine.update_tracked_users([alice, bob]).await?;
1503
1504            // Assert requests.
1505            let outgoing_requests = olm_machine.outgoing_requests().await?;
1506
1507            assert_eq!(outgoing_requests.len(), 2);
1508            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1509            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1510
1511            // Fake responses.
1512            olm_machine
1513                .mark_request_as_sent(
1514                    outgoing_requests[0].request_id(),
1515                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1516                        "one_time_key_counts": {}
1517                    }))),
1518                )
1519                .await?;
1520
1521            olm_machine
1522                .mark_request_as_sent(
1523                    outgoing_requests[1].request_id(),
1524                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1525                        "device_keys": {
1526                            alice: {},
1527                            bob: {},
1528                        }
1529                    }))),
1530                )
1531                .await?;
1532
1533            // Once more.
1534            let outgoing_requests = olm_machine.outgoing_requests().await?;
1535
1536            assert_eq!(outgoing_requests.len(), 1);
1537            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1538
1539            olm_machine
1540                .mark_request_as_sent(
1541                    outgoing_requests[0].request_id(),
1542                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1543                        "device_keys": {
1544                            me: {},
1545                        }
1546                    }))),
1547                )
1548                .await?;
1549
1550            // No more.
1551            let outgoing_requests = olm_machine.outgoing_requests().await?;
1552
1553            assert!(outgoing_requests.is_empty());
1554        }
1555
1556        let sync = client
1557            .sliding_sync("test-slidingsync")?
1558            .add_list(SlidingSyncList::builder("new_list"))
1559            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1560            .build()
1561            .await?;
1562
1563        // First request: no `pos`.
1564        let txn_id = TransactionId::new();
1565        let (_request, _, _) = sync
1566            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1567            .await?;
1568
1569        // Now, tracked users must be dirty.
1570        {
1571            let olm_machine = client.olm_machine().await;
1572            let olm_machine = olm_machine.as_ref().unwrap();
1573
1574            // Assert requests.
1575            let outgoing_requests = olm_machine.outgoing_requests().await?;
1576
1577            assert_eq!(outgoing_requests.len(), 1);
1578            assert_matches!(
1579                outgoing_requests[0].request(),
1580                AnyOutgoingRequest::KeysQuery(request) => {
1581                    assert!(request.device_keys.contains_key(alice));
1582                    assert!(request.device_keys.contains_key(bob));
1583                    assert!(request.device_keys.contains_key(me));
1584                }
1585            );
1586
1587            // Fake responses.
1588            olm_machine
1589                .mark_request_as_sent(
1590                    outgoing_requests[0].request_id(),
1591                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1592                        "device_keys": {
1593                            alice: {},
1594                            bob: {},
1595                            me: {},
1596                        }
1597                    }))),
1598                )
1599                .await?;
1600        }
1601
1602        // Second request: with a `pos` this time.
1603        sync.set_pos("chocolat".to_owned()).await;
1604
1605        let txn_id = TransactionId::new();
1606        let (_request, _, _) = sync
1607            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1608            .await?;
1609
1610        // Tracked users are not marked as dirty.
1611        {
1612            let olm_machine = client.olm_machine().await;
1613            let olm_machine = olm_machine.as_ref().unwrap();
1614
1615            // Assert requests.
1616            let outgoing_requests = olm_machine.outgoing_requests().await?;
1617
1618            assert!(outgoing_requests.is_empty());
1619        }
1620
1621        Ok(())
1622    }
1623
1624    #[async_test]
1625    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1626        let server = MockServer::start().await;
1627        let client = logged_in_client(Some(server.uri())).await;
1628
1629        let sliding_sync = client
1630            .sliding_sync("test-slidingsync")?
1631            .with_to_device_extension(
1632                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1633            )
1634            .build()
1635            .await?;
1636
1637        // First request asks to enable the extension.
1638        let (request, _, _) =
1639            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1640        assert!(request.extensions.to_device.enabled.is_some());
1641
1642        let sync = sliding_sync.sync();
1643        pin_mut!(sync);
1644
1645        // `pos` is `None` to start with.
1646        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1647
1648        #[derive(Deserialize)]
1649        struct PartialRequest {
1650            txn_id: Option<String>,
1651        }
1652
1653        {
1654            let _mock_guard = Mock::given(SlidingSyncMatcher)
1655                .respond_with(|request: &Request| {
1656                    // Repeat the txn_id in the response, if set.
1657                    let request: PartialRequest = request.body_json().unwrap();
1658
1659                    ResponseTemplate::new(200).set_body_json(json!({
1660                        "txn_id": request.txn_id,
1661                        "pos": "0",
1662                    }))
1663                })
1664                .mount_as_scoped(&server)
1665                .await;
1666
1667            let next = sync.next().await;
1668            assert_matches!(next, Some(Ok(_update_summary)));
1669
1670            // `pos` has been updated.
1671            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1672        }
1673
1674        // Next request doesn't ask to enable the extension.
1675        let (request, _, _) =
1676            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1677        assert!(request.extensions.to_device.enabled.is_none());
1678
1679        // Next request is successful.
1680        {
1681            let _mock_guard = Mock::given(SlidingSyncMatcher)
1682                .respond_with(|request: &Request| {
1683                    // Repeat the txn_id in the response, if set.
1684                    let request: PartialRequest = request.body_json().unwrap();
1685
1686                    ResponseTemplate::new(200).set_body_json(json!({
1687                        "txn_id": request.txn_id,
1688                        "pos": "1",
1689                    }))
1690                })
1691                .mount_as_scoped(&server)
1692                .await;
1693
1694            let next = sync.next().await;
1695            assert_matches!(next, Some(Ok(_update_summary)));
1696
1697            // `pos` has been updated.
1698            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1699        }
1700
1701        // Next request is successful despite it receives an already
1702        // received `pos` from the server.
1703        {
1704            let _mock_guard = Mock::given(SlidingSyncMatcher)
1705                .respond_with(|request: &Request| {
1706                    // Repeat the txn_id in the response, if set.
1707                    let request: PartialRequest = request.body_json().unwrap();
1708
1709                    ResponseTemplate::new(200).set_body_json(json!({
1710                        "txn_id": request.txn_id,
1711                        "pos": "0", // <- already received!
1712                    }))
1713                })
1714                .up_to_n_times(1) // run this mock only once.
1715                .mount_as_scoped(&server)
1716                .await;
1717
1718            let next = sync.next().await;
1719            assert_matches!(next, Some(Ok(_update_summary)));
1720
1721            // `pos` has been updated.
1722            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1723        }
1724
1725        // Stop responding with successful requests!
1726        //
1727        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1728        // so they're reset. It also resets the `pos`.
1729        {
1730            let _mock_guard = Mock::given(SlidingSyncMatcher)
1731                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1732                    "error": "foo",
1733                    "errcode": "M_UNKNOWN_POS",
1734                })))
1735                .mount_as_scoped(&server)
1736                .await;
1737
1738            let next = sync.next().await;
1739
1740            // The expected error is returned.
1741            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1742
1743            // `pos` has been reset.
1744            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1745
1746            // Next request asks to enable the extension again.
1747            let (request, _, _) =
1748                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1749
1750            assert!(request.extensions.to_device.enabled.is_some());
1751
1752            // `sync` has been stopped.
1753            assert!(sync.next().await.is_none());
1754        }
1755
1756        Ok(())
1757    }
1758
1759    #[cfg(feature = "e2e-encryption")]
1760    #[async_test]
1761    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1762        let server = MockServer::start().await;
1763
1764        #[derive(Deserialize)]
1765        struct PartialRequest {
1766            txn_id: Option<String>,
1767        }
1768
1769        let server_pos = Arc::new(Mutex::new(0));
1770        let _mock_guard = Mock::given(SlidingSyncMatcher)
1771            .respond_with(move |request: &Request| {
1772                // Repeat the txn_id in the response, if set.
1773                let request: PartialRequest = request.body_json().unwrap();
1774                let pos = {
1775                    let mut pos = server_pos.lock().unwrap();
1776                    let prev = *pos;
1777                    *pos += 1;
1778                    prev
1779                };
1780
1781                ResponseTemplate::new(200).set_body_json(json!({
1782                    "txn_id": request.txn_id,
1783                    "pos": pos.to_string(),
1784                }))
1785            })
1786            .mount_as_scoped(&server)
1787            .await;
1788
1789        let client = logged_in_client(Some(server.uri())).await;
1790
1791        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1792
1793        // `pos` is `None` to start with.
1794        {
1795            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1796
1797            let (request, _, _) =
1798                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1799            assert!(request.pos.is_none());
1800        }
1801
1802        let sync = sliding_sync.sync();
1803        pin_mut!(sync);
1804
1805        // Sync goes well, and then the position is saved both into the internal memory
1806        // and the database.
1807        let next = sync.next().await;
1808        assert_matches!(next, Some(Ok(_update_summary)));
1809
1810        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1811
1812        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1813            .await?
1814            .expect("must have restored fields");
1815
1816        // While it has been saved into the database, it's not necessarily going to be
1817        // used later!
1818        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1819
1820        // Now, even if we mess with the position stored in the database, the sliding
1821        // sync instance isn't configured to reload the stream position from the
1822        // database, so it won't be changed.
1823        {
1824            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1825
1826            let mut position_guard = other_sync.inner.position.lock().await;
1827            position_guard.pos = Some("yolo".to_owned());
1828
1829            other_sync.cache_to_storage(&position_guard).await?;
1830        }
1831
1832        // It's still 0, not "yolo".
1833        {
1834            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1835            let (request, _, _) =
1836                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1837            assert_eq!(request.pos.as_deref(), Some("0"));
1838        }
1839
1840        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1841        // asked to.
1842        {
1843            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1844            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1845        }
1846
1847        Ok(())
1848    }
1849
1850    #[cfg(feature = "e2e-encryption")]
1851    #[async_test]
1852    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1853        let server = MockServer::start().await;
1854
1855        #[derive(Deserialize)]
1856        struct PartialRequest {
1857            txn_id: Option<String>,
1858        }
1859
1860        let server_pos = Arc::new(Mutex::new(0));
1861        let _mock_guard = Mock::given(SlidingSyncMatcher)
1862            .respond_with(move |request: &Request| {
1863                // Repeat the txn_id in the response, if set.
1864                let request: PartialRequest = request.body_json().unwrap();
1865                let pos = {
1866                    let mut pos = server_pos.lock().unwrap();
1867                    let prev = *pos;
1868                    *pos += 1;
1869                    prev
1870                };
1871
1872                ResponseTemplate::new(200).set_body_json(json!({
1873                    "txn_id": request.txn_id,
1874                    "pos": pos.to_string(),
1875                }))
1876            })
1877            .mount_as_scoped(&server)
1878            .await;
1879
1880        let client = logged_in_client(Some(server.uri())).await;
1881
1882        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1883
1884        // `pos` is `None` to start with.
1885        {
1886            let (request, _, _) =
1887                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1888
1889            assert!(request.pos.is_none());
1890            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1891        }
1892
1893        let sync = sliding_sync.sync();
1894        pin_mut!(sync);
1895
1896        // Sync goes well, and then the position is saved both into the internal memory
1897        // and the database.
1898        let next = sync.next().await;
1899        assert_matches!(next, Some(Ok(_update_summary)));
1900
1901        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1902
1903        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1904            .await?
1905            .expect("must have restored fields");
1906
1907        // While it has been saved into the database, it's not necessarily going to be
1908        // used later!
1909        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1910
1911        // Another process modifies the stream position under our feet...
1912        {
1913            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1914
1915            let mut position_guard = other_sync.inner.position.lock().await;
1916            position_guard.pos = Some("42".to_owned());
1917
1918            other_sync.cache_to_storage(&position_guard).await?;
1919        }
1920
1921        // It's alright, the next request will load it from the database.
1922        {
1923            let (request, _, _) =
1924                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1925            assert_eq!(request.pos.as_deref(), Some("42"));
1926            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1927        }
1928
1929        // Recreating a sliding sync with the same ID will reload it too.
1930        {
1931            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1932            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1933
1934            let (request, _, _) =
1935                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1936            assert_eq!(request.pos.as_deref(), Some("42"));
1937        }
1938
1939        // Invalidating the session will remove the in-memory value AND the database
1940        // value.
1941        sliding_sync.expire_session().await;
1942
1943        {
1944            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1945
1946            let (request, _, _) =
1947                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1948            assert!(request.pos.is_none());
1949        }
1950
1951        // And new sliding syncs with the same ID won't find it either.
1952        {
1953            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1954            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1955
1956            let (request, _, _) =
1957                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1958            assert!(request.pos.is_none());
1959        }
1960
1961        Ok(())
1962    }
1963
1964    #[async_test]
1965    async fn test_stop_sync_loop() -> Result<()> {
1966        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1967            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1968        .await?;
1969
1970        // Start the sync loop.
1971        let stream = sliding_sync.sync();
1972        pin_mut!(stream);
1973
1974        // The sync loop is actually running.
1975        assert!(stream.next().await.is_some());
1976
1977        // Stop the sync loop.
1978        sliding_sync.stop_sync()?;
1979
1980        // The sync loop is actually stopped.
1981        assert!(stream.next().await.is_none());
1982
1983        // Start a new sync loop.
1984        let stream = sliding_sync.sync();
1985        pin_mut!(stream);
1986
1987        // The sync loop is actually running.
1988        assert!(stream.next().await.is_some());
1989
1990        Ok(())
1991    }
1992
1993    #[async_test]
1994    async fn test_process_read_receipts() -> Result<()> {
1995        let room = owned_room_id!("!pony:example.org");
1996
1997        let server = MockServer::start().await;
1998        let client = logged_in_client(Some(server.uri())).await;
1999        client.event_cache().subscribe().unwrap();
2000
2001        let sliding_sync = client
2002            .sliding_sync("test")?
2003            .with_receipt_extension(
2004                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2005            )
2006            .add_list(
2007                SlidingSyncList::builder("all")
2008                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2009            )
2010            .build()
2011            .await?;
2012
2013        // Initial state.
2014        {
2015            let server_response = assign!(http::Response::new("0".to_owned()), {
2016                rooms: BTreeMap::from([(
2017                    room.clone(),
2018                    http::response::Room::default(),
2019                )])
2020            });
2021
2022            let _summary = {
2023                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2024                sliding_sync
2025                    .handle_response(
2026                        server_response.clone(),
2027                        &mut pos_guard,
2028                        RequestedRequiredStates::default(),
2029                    )
2030                    .await?
2031            };
2032        }
2033
2034        let server_response = assign!(http::Response::new("1".to_owned()), {
2035            extensions: assign!(http::response::Extensions::default(), {
2036                receipts: assign!(http::response::Receipts::default(), {
2037                    rooms: BTreeMap::from([
2038                        (
2039                            room.clone(),
2040                            Raw::from_json_string(
2041                                json!({
2042                                    "room_id": room,
2043                                    "type": "m.receipt",
2044                                    "content": {
2045                                        "$event:bar.org": {
2046                                            "m.read": {
2047                                                client.user_id().unwrap(): {
2048                                                    "ts": 1436451550,
2049                                                }
2050                                            }
2051                                        }
2052                                    }
2053                                })
2054                                .to_string(),
2055                            ).unwrap()
2056                        )
2057                    ])
2058                })
2059            })
2060        });
2061
2062        let summary = {
2063            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2064            sliding_sync
2065                .handle_response(
2066                    server_response.clone(),
2067                    &mut pos_guard,
2068                    RequestedRequiredStates::default(),
2069                )
2070                .await?
2071        };
2072
2073        assert!(summary.rooms.contains(&room));
2074
2075        Ok(())
2076    }
2077
2078    #[async_test]
2079    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2080        let room_id = owned_room_id!("!unicorn:example.org");
2081
2082        let server = MockServer::start().await;
2083        let client = logged_in_client(Some(server.uri())).await;
2084
2085        // Setup sliding sync with with one room and one list
2086
2087        let sliding_sync = client
2088            .sliding_sync("test")?
2089            .with_account_data_extension(
2090                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2091            )
2092            .add_list(
2093                SlidingSyncList::builder("all")
2094                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2095            )
2096            .build()
2097            .await?;
2098
2099        // Initial state.
2100        {
2101            let server_response = assign!(http::Response::new("0".to_owned()), {
2102                rooms: BTreeMap::from([(
2103                    room_id.clone(),
2104                    http::response::Room::default(),
2105                )])
2106            });
2107
2108            let _summary = {
2109                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2110                sliding_sync
2111                    .handle_response(
2112                        server_response.clone(),
2113                        &mut pos_guard,
2114                        RequestedRequiredStates::default(),
2115                    )
2116                    .await?
2117            };
2118        }
2119
2120        // Simulate a response that only changes the marked unread state of the room to
2121        // true
2122
2123        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2124
2125        let update_summary = {
2126            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2127            sliding_sync
2128                .handle_response(
2129                    server_response.clone(),
2130                    &mut pos_guard,
2131                    RequestedRequiredStates::default(),
2132                )
2133                .await?
2134        };
2135
2136        // Check that the list list and entry received the update
2137
2138        assert!(update_summary.rooms.contains(&room_id));
2139
2140        let room = client.get_room(&room_id).unwrap();
2141
2142        // Check the actual room data, this powers RoomInfo
2143
2144        assert!(room.is_marked_unread());
2145
2146        // Change it back to false and check if it updates
2147
2148        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2149
2150        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2151        sliding_sync
2152            .handle_response(
2153                server_response.clone(),
2154                &mut pos_guard,
2155                RequestedRequiredStates::default(),
2156            )
2157            .await?;
2158
2159        let room = client.get_room(&room_id).unwrap();
2160
2161        assert!(!room.is_marked_unread());
2162
2163        Ok(())
2164    }
2165
2166    fn make_mark_unread_response(
2167        response_number: &str,
2168        room_id: OwnedRoomId,
2169        unread: bool,
2170        add_rooms_section: bool,
2171    ) -> http::Response {
2172        let rooms = if add_rooms_section {
2173            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2174        } else {
2175            BTreeMap::new()
2176        };
2177
2178        let extensions = assign!(http::response::Extensions::default(), {
2179            account_data: assign!(http::response::AccountData::default(), {
2180                rooms: BTreeMap::from([
2181                    (
2182                        room_id,
2183                        vec![
2184                            Raw::from_json_string(
2185                                json!({
2186                                    "content": {
2187                                        "unread": unread
2188                                    },
2189                                    "type": "m.marked_unread"
2190                                })
2191                                .to_string(),
2192                            ).unwrap()
2193                        ]
2194                    )
2195                ])
2196            })
2197        });
2198
2199        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2200    }
2201
2202    #[async_test]
2203    async fn test_process_rooms_account_data() -> Result<()> {
2204        let room = owned_room_id!("!pony:example.org");
2205
2206        let server = MockServer::start().await;
2207        let client = logged_in_client(Some(server.uri())).await;
2208
2209        let sliding_sync = client
2210            .sliding_sync("test")?
2211            .with_account_data_extension(
2212                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2213            )
2214            .add_list(
2215                SlidingSyncList::builder("all")
2216                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2217            )
2218            .build()
2219            .await?;
2220
2221        // Initial state.
2222        {
2223            let server_response = assign!(http::Response::new("0".to_owned()), {
2224                rooms: BTreeMap::from([(
2225                    room.clone(),
2226                    http::response::Room::default(),
2227                )])
2228            });
2229
2230            let _summary = {
2231                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2232                sliding_sync
2233                    .handle_response(
2234                        server_response.clone(),
2235                        &mut pos_guard,
2236                        RequestedRequiredStates::default(),
2237                    )
2238                    .await?
2239            };
2240        }
2241
2242        let server_response = assign!(http::Response::new("1".to_owned()), {
2243            extensions: assign!(http::response::Extensions::default(), {
2244                account_data: assign!(http::response::AccountData::default(), {
2245                    rooms: BTreeMap::from([
2246                        (
2247                            room.clone(),
2248                            vec![
2249                                Raw::from_json_string(
2250                                    json!({
2251                                        "content": {
2252                                            "tags": {
2253                                                "u.work": {
2254                                                    "order": 0.9
2255                                                }
2256                                            }
2257                                        },
2258                                        "type": "m.tag"
2259                                    })
2260                                    .to_string(),
2261                                ).unwrap()
2262                            ]
2263                        )
2264                    ])
2265                })
2266            })
2267        });
2268        let summary = {
2269            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2270            sliding_sync
2271                .handle_response(
2272                    server_response.clone(),
2273                    &mut pos_guard,
2274                    RequestedRequiredStates::default(),
2275                )
2276                .await?
2277        };
2278
2279        assert!(summary.rooms.contains(&room));
2280
2281        Ok(())
2282    }
2283
2284    #[async_test]
2285    #[cfg(feature = "e2e-encryption")]
2286    async fn test_process_only_encryption_events() -> Result<()> {
2287        use ruma::OneTimeKeyAlgorithm;
2288
2289        let room = owned_room_id!("!croissant:example.org");
2290
2291        let server = MockServer::start().await;
2292        let client = logged_in_client(Some(server.uri())).await;
2293
2294        let server_response = assign!(http::Response::new("0".to_owned()), {
2295            rooms: BTreeMap::from([(
2296                room.clone(),
2297                assign!(http::response::Room::default(), {
2298                    name: Some("Croissants lovers".to_owned()),
2299                    timeline: Vec::new(),
2300                }),
2301            )]),
2302
2303            extensions: assign!(http::response::Extensions::default(), {
2304                e2ee: assign!(http::response::E2EE::default(), {
2305                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2306                }),
2307                to_device: Some(assign!(http::response::ToDevice::default(), {
2308                    next_batch: "to-device-token".to_owned(),
2309                })),
2310            })
2311        });
2312
2313        // Don't process non-encryption events if the sliding sync is configured for
2314        // encryption only.
2315
2316        let sliding_sync = client
2317            .sliding_sync("test")?
2318            .with_to_device_extension(
2319                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2320            )
2321            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2322            .build()
2323            .await?;
2324
2325        {
2326            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2327
2328            sliding_sync
2329                .handle_response(
2330                    server_response.clone(),
2331                    &mut position_guard,
2332                    RequestedRequiredStates::default(),
2333                )
2334                .await?;
2335        }
2336
2337        // E2EE has been properly handled.
2338        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2339        assert_eq!(uploaded_key_count, 42);
2340
2341        {
2342            let olm_machine = &*client.olm_machine_for_testing().await;
2343            assert_eq!(
2344                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2345                Some("to-device-token")
2346            );
2347        }
2348
2349        // Room events haven't.
2350        assert!(client.get_room(&room).is_none());
2351
2352        // Conversely, only process room lists events if the sliding sync was configured
2353        // as so.
2354        let client = logged_in_client(Some(server.uri())).await;
2355
2356        let sliding_sync = client
2357            .sliding_sync("test")?
2358            .add_list(SlidingSyncList::builder("thelist"))
2359            .build()
2360            .await?;
2361
2362        {
2363            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2364
2365            sliding_sync
2366                .handle_response(
2367                    server_response.clone(),
2368                    &mut position_guard,
2369                    RequestedRequiredStates::default(),
2370                )
2371                .await?;
2372        }
2373
2374        // E2EE response has been ignored.
2375        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2376        assert_eq!(uploaded_key_count, 0);
2377
2378        {
2379            let olm_machine = &*client.olm_machine_for_testing().await;
2380            assert_eq!(
2381                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2382                None
2383            );
2384        }
2385
2386        // The room is now known.
2387        assert!(client.get_room(&room).is_some());
2388
2389        // And it's also possible to set up both.
2390        let client = logged_in_client(Some(server.uri())).await;
2391
2392        let sliding_sync = client
2393            .sliding_sync("test")?
2394            .add_list(SlidingSyncList::builder("thelist"))
2395            .with_to_device_extension(
2396                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2397            )
2398            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2399            .build()
2400            .await?;
2401
2402        {
2403            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2404
2405            sliding_sync
2406                .handle_response(
2407                    server_response.clone(),
2408                    &mut position_guard,
2409                    RequestedRequiredStates::default(),
2410                )
2411                .await?;
2412        }
2413
2414        // E2EE has been properly handled.
2415        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2416        assert_eq!(uploaded_key_count, 42);
2417
2418        {
2419            let olm_machine = &*client.olm_machine_for_testing().await;
2420            assert_eq!(
2421                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2422                Some("to-device-token")
2423            );
2424        }
2425
2426        // The room is now known.
2427        assert!(client.get_room(&room).is_some());
2428
2429        Ok(())
2430    }
2431
2432    #[async_test]
2433    async fn test_lock_multiple_requests() -> Result<()> {
2434        let server = MockServer::start().await;
2435        let client = logged_in_client(Some(server.uri())).await;
2436
2437        let pos = Arc::new(Mutex::new(0));
2438        let _mock_guard = Mock::given(SlidingSyncMatcher)
2439            .respond_with(move |_: &Request| {
2440                let mut pos = pos.lock().unwrap();
2441                *pos += 1;
2442                ResponseTemplate::new(200).set_body_json(json!({
2443                    "pos": pos.to_string(),
2444                    "lists": {},
2445                    "rooms": {}
2446                }))
2447            })
2448            .mount_as_scoped(&server)
2449            .await;
2450
2451        let sliding_sync = client
2452            .sliding_sync("test")?
2453            .with_to_device_extension(
2454                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2455            )
2456            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2457            .build()
2458            .await?;
2459
2460        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2461        // test would never terminate.
2462        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2463
2464        for result in requests.await {
2465            result?;
2466        }
2467
2468        Ok(())
2469    }
2470
2471    #[async_test]
2472    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2473        let server = MockServer::start().await;
2474        let client = logged_in_client(Some(server.uri())).await;
2475
2476        let pos = Arc::new(Mutex::new(0));
2477        let _mock_guard = Mock::given(SlidingSyncMatcher)
2478            .respond_with(move |_: &Request| {
2479                let mut pos = pos.lock().unwrap();
2480                *pos += 1;
2481                // Respond slowly enough that we can skip one iteration.
2482                ResponseTemplate::new(200)
2483                    .set_body_json(json!({
2484                        "pos": pos.to_string(),
2485                        "lists": {},
2486                        "rooms": {}
2487                    }))
2488                    .set_delay(Duration::from_secs(2))
2489            })
2490            .mount_as_scoped(&server)
2491            .await;
2492
2493        let sliding_sync =
2494            client
2495                .sliding_sync("test")?
2496                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2497                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2498                ))
2499                .add_list(
2500                    SlidingSyncList::builder("another-list")
2501                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2502                )
2503                .build()
2504                .await?;
2505
2506        let stream = sliding_sync.sync();
2507        pin_mut!(stream);
2508
2509        let cloned_sync = sliding_sync.clone();
2510        spawn(async move {
2511            tokio::time::sleep(Duration::from_millis(100)).await;
2512
2513            cloned_sync
2514                .on_list("another-list", |list| {
2515                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2516                    ready(())
2517                })
2518                .await;
2519        });
2520
2521        assert_matches!(stream.next().await, Some(Ok(_)));
2522
2523        sliding_sync.stop_sync().unwrap();
2524
2525        assert_matches!(stream.next().await, None);
2526
2527        let mut num_requests = 0;
2528
2529        for request in server.received_requests().await.unwrap() {
2530            if !SlidingSyncMatcher.matches(&request) {
2531                continue;
2532            }
2533
2534            let another_list_ranges = if num_requests == 0 {
2535                // First request
2536                json!([[0, 10]])
2537            } else {
2538                // Second request
2539                json!([[10, 20]])
2540            };
2541
2542            num_requests += 1;
2543            assert!(num_requests <= 2, "more than one request hit the server");
2544
2545            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2546
2547            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2548                &json_value,
2549                &json!({
2550                    "conn_id": "test",
2551                    "lists": {
2552                        "room-list": {
2553                            "ranges": [[0, 9]],
2554                            "required_state": [
2555                                ["m.room.encryption", ""],
2556                                ["m.room.tombstone", ""]
2557                            ],
2558                        },
2559                        "another-list": {
2560                            "ranges": another_list_ranges,
2561                            "required_state": [
2562                                ["m.room.encryption", ""],
2563                                ["m.room.tombstone", ""]
2564                            ],
2565                        },
2566                    }
2567                }),
2568                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2569            ) {
2570                dbg!(json_value);
2571                panic!("json differ: {err}");
2572            }
2573        }
2574
2575        assert_eq!(num_requests, 2);
2576
2577        Ok(())
2578    }
2579
2580    #[async_test]
2581    async fn test_timeout_zero_list() -> Result<()> {
2582        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2583
2584        let (request, _, _) =
2585            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2586
2587        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2588        // on new update to pop.
2589        assert!(request.timeout.is_some());
2590
2591        Ok(())
2592    }
2593
2594    #[async_test]
2595    async fn test_timeout_one_list() -> Result<()> {
2596        let (_server, sliding_sync) = new_sliding_sync(vec![
2597            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2598        ])
2599        .await?;
2600
2601        let (request, _, _) =
2602            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2603
2604        // The list does not require a timeout.
2605        assert!(request.timeout.is_none());
2606
2607        // Simulate a response.
2608        {
2609            let server_response = assign!(http::Response::new("0".to_owned()), {
2610                lists: BTreeMap::from([(
2611                    "foo".to_owned(),
2612                    assign!(http::response::List::default(), {
2613                        count: uint!(7),
2614                    })
2615                 )])
2616            });
2617
2618            let _summary = {
2619                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2620                sliding_sync
2621                    .handle_response(
2622                        server_response.clone(),
2623                        &mut pos_guard,
2624                        RequestedRequiredStates::default(),
2625                    )
2626                    .await?
2627            };
2628        }
2629
2630        let (request, _, _) =
2631            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2632
2633        // The list is now fully loaded, so it requires a timeout.
2634        assert!(request.timeout.is_some());
2635
2636        Ok(())
2637    }
2638
2639    #[async_test]
2640    async fn test_timeout_three_lists() -> Result<()> {
2641        let (_server, sliding_sync) = new_sliding_sync(vec![
2642            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2643            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2644            SlidingSyncList::builder("baz")
2645                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2646        ])
2647        .await?;
2648
2649        let (request, _, _) =
2650            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2651
2652        // Two lists don't require a timeout.
2653        assert!(request.timeout.is_none());
2654
2655        // Simulate a response.
2656        {
2657            let server_response = assign!(http::Response::new("0".to_owned()), {
2658                lists: BTreeMap::from([(
2659                    "foo".to_owned(),
2660                    assign!(http::response::List::default(), {
2661                        count: uint!(7),
2662                    })
2663                 )])
2664            });
2665
2666            let _summary = {
2667                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2668                sliding_sync
2669                    .handle_response(
2670                        server_response.clone(),
2671                        &mut pos_guard,
2672                        RequestedRequiredStates::default(),
2673                    )
2674                    .await?
2675            };
2676        }
2677
2678        let (request, _, _) =
2679            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2680
2681        // One don't require a timeout.
2682        assert!(request.timeout.is_none());
2683
2684        // Simulate a response.
2685        {
2686            let server_response = assign!(http::Response::new("1".to_owned()), {
2687                lists: BTreeMap::from([(
2688                    "bar".to_owned(),
2689                    assign!(http::response::List::default(), {
2690                        count: uint!(7),
2691                    })
2692                 )])
2693            });
2694
2695            let _summary = {
2696                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2697                sliding_sync
2698                    .handle_response(
2699                        server_response.clone(),
2700                        &mut pos_guard,
2701                        RequestedRequiredStates::default(),
2702                    )
2703                    .await?
2704            };
2705        }
2706
2707        let (request, _, _) =
2708            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2709
2710        // All lists require a timeout.
2711        assert!(request.timeout.is_some());
2712
2713        Ok(())
2714    }
2715
2716    #[async_test]
2717    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2718        let server = MockServer::start().await;
2719        let client = logged_in_client(Some(server.uri())).await;
2720
2721        let _mock_guard = Mock::given(SlidingSyncMatcher)
2722            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2723                "pos": "0",
2724                "lists": {},
2725                "rooms": {}
2726            })))
2727            .mount_as_scoped(&server)
2728            .await;
2729
2730        let sliding_sync = client
2731            .sliding_sync("test")?
2732            .with_to_device_extension(
2733                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2734            )
2735            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2736            .build()
2737            .await?;
2738
2739        let sliding_sync = Arc::new(sliding_sync);
2740
2741        // Create the listener and perform a sync request
2742        let sync_beat_listener = client.inner.sync_beat.listen();
2743        sliding_sync.sync_once().await?;
2744
2745        // The sync beat listener should be notified shortly after
2746        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2747        Ok(())
2748    }
2749
2750    #[async_test]
2751    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2752        let server = MockServer::start().await;
2753        let client = logged_in_client(Some(server.uri())).await;
2754
2755        let _mock_guard = Mock::given(SlidingSyncMatcher)
2756            .respond_with(ResponseTemplate::new(404))
2757            .mount_as_scoped(&server)
2758            .await;
2759
2760        let sliding_sync = client
2761            .sliding_sync("test")?
2762            .with_to_device_extension(
2763                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2764            )
2765            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2766            .build()
2767            .await?;
2768
2769        let sliding_sync = Arc::new(sliding_sync);
2770
2771        // Create the listener and perform a sync request
2772        let sync_beat_listener = client.inner.sync_beat.listen();
2773        let sync_result = sliding_sync.sync_once().await;
2774        assert!(sync_result.is_err());
2775
2776        // The sync beat listener won't be notified in this case
2777        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2778
2779        Ok(())
2780    }
2781
2782    #[async_test]
2783    async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2784        let server = MatrixMockServer::new().await;
2785        let client = server.client_builder().build().await;
2786        let room_id = room_id!("!mu5hr00m:example.org");
2787
2788        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2789            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2790                "pos": "0",
2791                "lists": {},
2792                "extensions": {
2793                    "account_data": {
2794                        "global": [
2795                            {
2796                                "type": "m.direct",
2797                                "content": {
2798                                    "@de4dlockh0lmes:example.org": [
2799                                        "!mu5hr00m:example.org"
2800                                    ]
2801                                }
2802                            }
2803                        ]
2804                    }
2805                },
2806                "rooms": {
2807                    room_id: {
2808                        "name": "Mario Bros Fanbase Room",
2809                        "initial": true,
2810                    },
2811                }
2812            })))
2813            .mount_as_scoped(server.server())
2814            .await;
2815
2816        let f = EventFactory::new().room(room_id);
2817
2818        Mock::given(method("GET"))
2819            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2820            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2821                "chunk": [
2822                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2823                ]
2824            })))
2825            .mount(server.server())
2826            .await;
2827
2828        let (tx, rx) = tokio::sync::oneshot::channel();
2829
2830        let tx = Arc::new(Mutex::new(Some(tx)));
2831        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2832            // Try to run a /members query while in a event handler.
2833            let members =
2834                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2835            assert_eq!(members.len(), 1);
2836            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2837        });
2838
2839        let sliding_sync = client
2840            .sliding_sync("test")?
2841            .add_list(SlidingSyncList::builder("thelist"))
2842            .with_account_data_extension(
2843                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2844            )
2845            .build()
2846            .await?;
2847
2848        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2849            .await
2850            .expect("Sync did not complete in time")
2851            .expect("Sync failed");
2852
2853        // Wait for the event handler to complete.
2854        tokio::time::timeout(Duration::from_secs(5), rx)
2855            .await
2856            .expect("Event handler did not complete in time")
2857            .expect("Event handler failed");
2858
2859        Ok(())
2860    }
2861}