matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26    collections::{btree_map::Entry, BTreeMap},
27    fmt::Debug,
28    future::Future,
29    sync::{Arc, RwLock as StdRwLock},
30    time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46    select,
47    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53    cache::restore_sliding_sync_state,
54    client::SlidingSyncResponseProcessor,
55    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{config::RequestConfig, Client, Result};
58
59/// The Sliding Sync instance.
60///
61/// It is OK to clone this type as much as you need: cloning it is cheap.
62#[derive(Clone, Debug)]
63pub struct SlidingSync {
64    /// The Sliding Sync data.
65    inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70    /// A unique identifier for this instance of sliding sync.
71    ///
72    /// Used to distinguish different connections to sliding sync.
73    id: String,
74
75    /// The HTTP Matrix client.
76    client: Client,
77
78    /// Long-polling timeout that appears in sliding sync request.
79    poll_timeout: Duration,
80
81    /// Extra duration for the sliding sync request to timeout. This is added to
82    /// the [`Self::poll_timeout`].
83    network_timeout: Duration,
84
85    /// The storage key to keep this cache at and load it from.
86    storage_key: String,
87
88    /// Should this sliding sync instance try to restore its sync position
89    /// from the database?
90    ///
91    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
92    /// keep it even so, to avoid sparkling cfg statements everywhere
93    /// throughout this file.
94    share_pos: bool,
95
96    /// Position markers.
97    ///
98    /// The `pos` marker represents a progression when exchanging requests and
99    /// responses with the server: the server acknowledges the request by
100    /// responding with a new `pos`. If the client sends two non-necessarily
101    /// consecutive requests with the same `pos`, the server has to reply with
102    /// the same identical response.
103    ///
104    /// `position` is behind a mutex so that a new request starts after the
105    /// previous request trip has fully ended (successfully or not). This
106    /// mechanism exists to wait for the response to be handled and to see the
107    /// `position` being updated, before sending a new request.
108    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110    /// The lists of this Sliding Sync instance.
111    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113    /// Request parameters that are sticky.
114    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    /// Whether the current sliding sync instance has set a sync position
127    /// marker.
128    pub async fn has_pos(&self) -> bool {
129        self.inner.position.lock().await.pos.is_some()
130    }
131
132    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133        cache::store_sliding_sync_state(self, position).await
134    }
135
136    /// Create a new [`SlidingSyncBuilder`].
137    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138        SlidingSyncBuilder::new(id, client)
139    }
140
141    /// Subscribe to many rooms.
142    ///
143    /// If the associated `Room`s exist, it will be marked as
144    /// members are missing, so that it ensures to re-fetch all members.
145    ///
146    /// A subscription to an already subscribed room is ignored.
147    pub fn subscribe_to_rooms(
148        &self,
149        room_ids: &[&RoomId],
150        settings: Option<http::request::RoomSubscription>,
151        cancel_in_flight_request: bool,
152    ) {
153        let settings = settings.unwrap_or_default();
154        let mut sticky = self.inner.sticky.write().unwrap();
155        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157        let mut skip_over_current_sync_loop_iteration = false;
158
159        for room_id in room_ids {
160            // If the room subscription already exists, let's not
161            // override it with a new one. First, it would reset its
162            // state (`RoomSubscriptionState`), and second it would try to
163            // re-subscribe with the next request. We don't want that. A room
164            // subscription should happen once, and next subscriptions should
165            // be ignored.
166            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167                if let Some(room) = self.inner.client.get_room(room_id) {
168                    room.mark_members_missing();
169                }
170
171                entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173                skip_over_current_sync_loop_iteration = true;
174            }
175        }
176
177        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178            self.inner.internal_channel_send_if_possible(
179                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180            );
181        }
182    }
183
184    /// Find a list by its name, and do something on it if it exists.
185    pub async fn on_list<Function, FunctionOutput, R>(
186        &self,
187        list_name: &str,
188        function: Function,
189    ) -> Option<R>
190    where
191        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192        FunctionOutput: Future<Output = R>,
193    {
194        let lists = self.inner.lists.read().await;
195
196        match lists.get(list_name) {
197            Some(list) => Some(function(list).await),
198            None => None,
199        }
200    }
201
202    /// Add the list to the list of lists.
203    ///
204    /// As lists need to have a unique `.name`, if a list with the same name
205    /// is found the new list will replace the old one and the return it or
206    /// `None`.
207    pub async fn add_list(
208        &self,
209        list_builder: SlidingSyncListBuilder,
210    ) -> Result<Option<SlidingSyncList>> {
211        let list = list_builder.build(self.inner.internal_channel.clone());
212
213        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215        self.inner.internal_channel_send_if_possible(
216            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217        );
218
219        Ok(old_list)
220    }
221
222    /// Add a list that will be cached and reloaded from the cache.
223    ///
224    /// This will raise an error if a storage key was not set, or if there
225    /// was a I/O error reading from the cache.
226    ///
227    /// The rest of the semantics is the same as [`Self::add_list`].
228    pub async fn add_cached_list(
229        &self,
230        mut list_builder: SlidingSyncListBuilder,
231    ) -> Result<Option<SlidingSyncList>> {
232        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236        self.add_list(list_builder).await
237    }
238
239    /// Handle the HTTP response.
240    #[instrument(skip_all)]
241    async fn handle_response(
242        &self,
243        mut sliding_sync_response: http::Response,
244        position: &mut SlidingSyncPositionMarkers,
245        requested_required_states: RequestedRequiredStates,
246    ) -> Result<UpdateSummary, crate::Error> {
247        let pos = Some(sliding_sync_response.pos.clone());
248
249        let must_process_rooms_response = self.must_process_rooms_response().await;
250
251        trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253        // Transform a Sliding Sync Response to a `SyncResponse`.
254        //
255        // We may not need the `sync_response` in the future (once `SyncResponse` will
256        // move to Sliding Sync, i.e. to `http::Response`), but processing the
257        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
258        // happens here.
259
260        let sync_response = {
261            let _timer = timer!("response processor");
262
263            let response_processor = {
264                // Take the lock to avoid concurrent sliding syncs overwriting each other's room
265                // infos.
266                let _sync_lock = {
267                    let _timer = timer!("acquiring the `sync_lock`");
268
269                    self.inner.client.base_client().sync_lock().lock().await
270                };
271
272                let mut response_processor =
273                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275                // Process thread subscriptions if they're available.
276                //
277                // It's important to do this *before* handling the room responses, so that
278                // notifications can be properly generated based on the thread subscriptions,
279                // for the events in threads we've subscribed to.
280                if self.is_thread_subscriptions_enabled() {
281                    response_processor
282                        .handle_thread_subscriptions(
283                            position.pos.as_deref(),
284                            std::mem::take(
285                                &mut sliding_sync_response.extensions.thread_subscriptions,
286                            ),
287                        )
288                        .await?;
289                }
290
291                #[cfg(feature = "e2e-encryption")]
292                if self.is_e2ee_enabled() {
293                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294                }
295
296                // Only handle the room's subsection of the response, if this sliding sync was
297                // configured to do so.
298                if must_process_rooms_response {
299                    response_processor
300                        .handle_room_response(&sliding_sync_response, &requested_required_states)
301                        .await?;
302                }
303
304                response_processor
305            };
306
307            // Release the lock before calling event handlers
308            response_processor.process_and_take_response().await?
309        };
310
311        debug!("Sliding Sync response has been handled by the client");
312        trace!(?sync_response);
313
314        // Commit sticky parameters, if needed.
315        if let Some(ref txn_id) = sliding_sync_response.txn_id {
316            let txn_id = txn_id.as_str().into();
317            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
318            let mut lists = self.inner.lists.write().await;
319            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
320        }
321
322        let update_summary = {
323            // Update the rooms.
324            let updated_rooms = {
325                let mut updated_rooms = Vec::with_capacity(
326                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
327                );
328
329                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
330
331                // There might be other rooms that were only mentioned in the sliding sync
332                // extensions part of the response, and thus would result in rooms present in
333                // the `sync_response.joined`. Mark them as updated too.
334                //
335                // Since we've removed rooms that were in the room subsection from
336                // `sync_response.rooms.joined`, the remaining ones aren't already present in
337                // `updated_rooms` and wouldn't cause any duplicates.
338                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
339
340                updated_rooms
341            };
342
343            // Update the lists.
344            let updated_lists = {
345                debug!(
346                    lists = ?sliding_sync_response.lists,
347                    "Update lists"
348                );
349
350                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
351                let mut lists = self.inner.lists.write().await;
352
353                // Iterate on known lists, not on lists in the response. Rooms may have been
354                // updated that were not involved in any list update.
355                for (name, list) in lists.iter_mut() {
356                    if let Some(updates) = sliding_sync_response.lists.get(name) {
357                        let maximum_number_of_rooms: u32 =
358                            updates.count.try_into().expect("failed to convert `count` to `u32`");
359
360                        if list.update(Some(maximum_number_of_rooms))? {
361                            updated_lists.push(name.clone());
362                        }
363                    } else if list.update(None)? {
364                        updated_lists.push(name.clone());
365                    }
366                }
367
368                // Report about unknown lists.
369                for name in sliding_sync_response.lists.keys() {
370                    if !lists.contains_key(name) {
371                        error!("Response for list `{name}` - unknown to us; skipping");
372                    }
373                }
374
375                updated_lists
376            };
377
378            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
379        };
380
381        // Everything went well, we can update the position markers.
382        //
383        // Save the new position markers.
384        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
385
386        position.pos = pos;
387
388        Ok(update_summary)
389    }
390
391    #[instrument(skip_all)]
392    async fn generate_sync_request(
393        &self,
394        txn_id: &mut LazyTransactionId,
395    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
396        // Collect requests for lists.
397        let mut requests_lists = BTreeMap::new();
398
399        let require_timeout = {
400            let lists = self.inner.lists.read().await;
401
402            // Start at `true` in case there is zero list.
403            let mut require_timeout = true;
404
405            for (name, list) in lists.iter() {
406                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
407                require_timeout = require_timeout && list.requires_timeout();
408            }
409
410            require_timeout
411        };
412
413        // Collect the `pos`.
414        //
415        // Wait on the `position` mutex to be available. It means no request nor
416        // response is running. The `position` mutex is released whether the response
417        // has been fully handled successfully, in this case the `pos` is updated, or
418        // the response handling has failed, in this case the `pos` hasn't been updated
419        // and the same `pos` will be used for this new request.
420        let mut position_guard = {
421            debug!("Waiting to acquire the `position` lock");
422
423            let _timer = timer!("acquiring the `position` lock");
424
425            self.inner.position.clone().lock_owned().await
426        };
427
428        debug!(pos = ?position_guard.pos, "Got a position");
429
430        let to_device_enabled =
431            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
432
433        let restored_fields = if self.inner.share_pos || to_device_enabled {
434            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
435        } else {
436            None
437        };
438
439        // Update pos: either the one restored from the database, if any and the sliding
440        // sync was configured so, or read it from the memory cache.
441        let pos = if self.inner.share_pos {
442            if let Some(fields) = &restored_fields {
443                // Override the memory one with the database one, for consistency.
444                if fields.pos != position_guard.pos {
445                    info!(
446                        "Pos from previous request ('{:?}') was different from \
447                         pos in database ('{:?}').",
448                        position_guard.pos, fields.pos
449                    );
450                    position_guard.pos = fields.pos.clone();
451                }
452                fields.pos.clone()
453            } else {
454                position_guard.pos.clone()
455            }
456        } else {
457            position_guard.pos.clone()
458        };
459
460        Span::current().record("pos", &pos);
461
462        // When the client sends a request with no `pos`, MSC4186 returns no device
463        // lists updates, as it only returns changes since the provided `pos`
464        // (which is `null` in this case); this is in line with sync v2.
465        //
466        // Therefore, with MSC4186, the device list cache must be marked as to be
467        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
468        // device lists updates that happened between the previous request and the new
469        // “initial” request.
470        #[cfg(feature = "e2e-encryption")]
471        if pos.is_none() && self.is_e2ee_enabled() {
472            info!("Marking all tracked users as dirty");
473
474            let olm_machine = self.inner.client.olm_machine().await;
475            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
476            olm_machine.mark_all_tracked_users_as_dirty().await?;
477        }
478
479        // Configure the timeout.
480        //
481        // The `timeout` query is necessary when all lists require it. Please see
482        // [`SlidingSyncList::requires_timeout`].
483        let timeout = require_timeout.then(|| self.inner.poll_timeout);
484
485        let mut request = assign!(http::Request::new(), {
486            conn_id: Some(self.inner.id.clone()),
487            pos,
488            timeout,
489            lists: requests_lists,
490        });
491
492        // Apply sticky parameters, if needs be.
493        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
494
495        // Extensions are now applied (via sticky parameters).
496        //
497        // Override the to-device token if the extension is enabled.
498        if to_device_enabled {
499            request.extensions.to_device.since =
500                restored_fields.and_then(|fields| fields.to_device_token);
501        }
502
503        // Apply the transaction id if one was generated.
504        if let Some(txn_id) = txn_id.get() {
505            request.txn_id = Some(txn_id.to_string());
506        }
507
508        Ok((
509            // The request itself.
510            request,
511            // Configure long-polling. We need some time for the long-poll itself,
512            // and extra time for the network delays.
513            RequestConfig::default()
514                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
515                .retry_limit(3),
516            position_guard,
517        ))
518    }
519
520    /// Send a sliding sync request.
521    ///
522    /// This method contains the sending logic.
523    async fn send_sync_request(
524        &self,
525        request: http::Request,
526        request_config: RequestConfig,
527        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
528    ) -> Result<UpdateSummary> {
529        debug!("Sending request");
530
531        // Prepare the request.
532        let requested_required_states = RequestedRequiredStates::from(&request);
533        let request = self.inner.client.send(request).with_request_config(request_config);
534
535        // Send the request and get a response with end-to-end encryption support.
536        //
537        // Sending the `/sync` request out when end-to-end encryption is enabled means
538        // that we need to also send out any outgoing e2ee related request out
539        // coming from the `OlmMachine::outgoing_requests()` method.
540
541        #[cfg(feature = "e2e-encryption")]
542        let response = {
543            if self.is_e2ee_enabled() {
544                // Here, we need to run 2 things:
545                //
546                // 1. Send the sliding sync request and get a response,
547                // 2. Send the E2EE requests.
548                //
549                // We don't want to use a `join` or `try_join` because we want to fail if and
550                // only if sending the sliding sync request fails. Failing to send the E2EE
551                // requests should just result in a log.
552                //
553                // We also want to give the priority to sliding sync request. E2EE requests are
554                // sent concurrently to the sliding sync request, but the priority is on waiting
555                // a sliding sync response.
556                //
557                // If sending sliding sync request fails, the sending of E2EE requests must be
558                // aborted as soon as possible.
559
560                let client = self.inner.client.clone();
561                let e2ee_uploads = spawn(
562                    async move {
563                        if let Err(error) = client.send_outgoing_requests().await {
564                            error!(?error, "Error while sending outgoing E2EE requests");
565                        }
566                    }
567                    .instrument(Span::current()),
568                )
569                // Ensure that the task is not running in detached mode. It is aborted when it's
570                // dropped.
571                .abort_on_drop();
572
573                // Wait on the sliding sync request success or failure early.
574                let response = request.await?;
575
576                // At this point, if `request` has been resolved successfully, we wait on
577                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
578                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
579                // been dropped, so aborted.
580                e2ee_uploads.await.map_err(|error| Error::JoinError {
581                    task_description: "e2ee_uploads".to_owned(),
582                    error,
583                })?;
584
585                response
586            } else {
587                request.await?
588            }
589        };
590
591        // Send the request and get a response _without_ end-to-end encryption support.
592        #[cfg(not(feature = "e2e-encryption"))]
593        let response = request.await?;
594
595        debug!("Received response");
596
597        // At this point, the request has been sent, and a response has been received.
598        //
599        // We must ensure the handling of the response cannot be stopped/
600        // cancelled. It must be done entirely, otherwise we can have
601        // corrupted/incomplete states for Sliding Sync and other parts of
602        // the code.
603        //
604        // That's why we are running the handling of the response in a spawned
605        // future that cannot be cancelled by anything.
606        let this = self.clone();
607
608        // Spawn a new future to ensure that the code inside this future cannot be
609        // cancelled if this method is cancelled.
610        let future = async move {
611            debug!("Start handling response");
612
613            // In case the task running this future is detached, we must
614            // ensure responses are handled one at a time. At this point we still own
615            // `position_guard`, so we're fine.
616
617            // Handle the response.
618            let updates = this
619                .handle_response(response, &mut position_guard, requested_required_states)
620                .await?;
621
622            this.cache_to_storage(&position_guard).await?;
623
624            // Release the position guard lock.
625            // It means that other responses can be generated and then handled later.
626            drop(position_guard);
627
628            debug!("Done handling response");
629
630            Ok(updates)
631        };
632
633        spawn(future.instrument(Span::current())).await.unwrap()
634    }
635
636    /// Is the e2ee extension enabled for this sliding sync instance?
637    #[cfg(feature = "e2e-encryption")]
638    fn is_e2ee_enabled(&self) -> bool {
639        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
640    }
641
642    /// Is the thread subscriptions extension enabled for this sliding sync
643    /// instance?
644    fn is_thread_subscriptions_enabled(&self) -> bool {
645        self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
646            == Some(true)
647    }
648
649    #[cfg(not(feature = "e2e-encryption"))]
650    fn is_e2ee_enabled(&self) -> bool {
651        false
652    }
653
654    /// Should we process the room's subpart of a response?
655    async fn must_process_rooms_response(&self) -> bool {
656        // We consider that we must, if there's any room subscription or there's any
657        // list.
658        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
659            || !self.inner.lists.read().await.is_empty()
660    }
661
662    /// Send a single sliding sync request, and returns the response summary.
663    ///
664    /// Public for testing purposes only.
665    #[doc(hidden)]
666    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
667    pub async fn sync_once(&self) -> Result<UpdateSummary> {
668        let (request, request_config, position_guard) =
669            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
670
671        // Send the request.
672        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
673
674        // Notify a new sync was received.
675        self.inner.client.inner.sync_beat.notify(usize::MAX);
676
677        Ok(summaries)
678    }
679
680    /// Create a _new_ Sliding Sync sync loop.
681    ///
682    /// This method returns a `Stream`, which will send requests and will handle
683    /// responses automatically. Lists and rooms are updated automatically.
684    ///
685    /// This function returns `Ok(…)` if everything went well, otherwise it will
686    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
687    /// termination.
688    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
689    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
690    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
691        debug!("Starting sync stream");
692
693        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
694
695        stream! {
696            loop {
697                debug!("Sync stream is running");
698
699                select! {
700                    biased;
701
702                    internal_message = internal_channel_receiver.recv() => {
703                        use SlidingSyncInternalMessage::*;
704
705                        debug!(?internal_message, "Sync stream has received an internal message");
706
707                        match internal_message {
708                            Err(_) | Ok(SyncLoopStop) => {
709                                break;
710                            }
711
712                            Ok(SyncLoopSkipOverCurrentIteration) => {
713                                continue;
714                            }
715                        }
716                    }
717
718                    update_summary = self.sync_once() => {
719                        match update_summary {
720                            Ok(updates) => {
721                                yield Ok(updates);
722                            }
723
724                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
725                            Err(error) => {
726                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
727                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
728                                    self.expire_session().await;
729                                }
730
731                                yield Err(error);
732
733                                // Terminates the loop, and terminates the stream.
734                                break;
735                            }
736                        }
737                    }
738                }
739            }
740
741            debug!("Sync stream has exited.");
742        }
743    }
744
745    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
746    ///
747    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
748    /// enough to “stop” it, but depending of how this `Stream` is used, it
749    /// might not be obvious to drop it immediately (thinking of using this API
750    /// over FFI; the foreign-language might not be able to drop a value
751    /// immediately). Thus, calling this method will ensure that the sync loop
752    /// stops gracefully and as soon as it returns.
753    pub fn stop_sync(&self) -> Result<()> {
754        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
755    }
756
757    /// Expire the current Sliding Sync session on the client-side.
758    ///
759    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
760    /// sticky parameters.
761    ///
762    /// This should only be used when it's clear that this session was about to
763    /// expire anyways, and should be used only in very specific cases (e.g.
764    /// multiple sliding syncs being run in parallel, and one of them has
765    /// expired).
766    ///
767    /// This method **MUST** be called when the sync loop is stopped.
768    #[doc(hidden)]
769    pub async fn expire_session(&self) {
770        info!("Session expired; resetting `pos` and sticky parameters");
771
772        {
773            let lists = self.inner.lists.read().await;
774            for list in lists.values() {
775                // Invalidate in-memory data that would be persisted on disk.
776                list.set_maximum_number_of_rooms(None);
777
778                // Invalidate the sticky data for this list.
779                list.invalidate_sticky_data();
780            }
781        }
782
783        // Remove the cached sliding sync state as well.
784        {
785            let mut position = self.inner.position.lock().await;
786
787            // Invalidate in memory.
788            position.pos = None;
789
790            // Propagate to disk.
791            // Note: this propagates both the sliding sync state and the cached lists'
792            // state to disk.
793            if let Err(err) = self.cache_to_storage(&position).await {
794                warn!("Failed to invalidate cached sliding sync state: {err}");
795            }
796        }
797
798        {
799            let mut sticky = self.inner.sticky.write().unwrap();
800
801            // Clear all room subscriptions: we don't want to resend all room subscriptions
802            // when the session will restart.
803            sticky.data_mut().room_subscriptions.clear();
804        }
805    }
806}
807
808impl SlidingSyncInner {
809    /// Send a message over the internal channel.
810    #[instrument]
811    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
812        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
813    }
814
815    /// Send a message over the internal channel if there is a receiver, i.e. if
816    /// the sync loop is running.
817    #[instrument]
818    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
819        // If there is no receiver, the send will fail, but that's OK here.
820        let _ = self.internal_channel.send(message);
821    }
822}
823
824#[derive(Copy, Clone, Debug, PartialEq)]
825enum SlidingSyncInternalMessage {
826    /// Instruct the sync loop to stop.
827    SyncLoopStop,
828
829    /// Instruct the sync loop to skip over any remaining work in its iteration,
830    /// and to jump to the next iteration.
831    SyncLoopSkipOverCurrentIteration,
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl SlidingSync {
836    /// Set a new value for `pos`.
837    pub async fn set_pos(&self, new_pos: String) {
838        let mut position_lock = self.inner.position.lock().await;
839        position_lock.pos = Some(new_pos);
840    }
841
842    /// Read the static extension configuration for this Sliding Sync.
843    ///
844    /// Note: this is not the next content of the sticky parameters, but rightly
845    /// the static configuration that was set during creation of this
846    /// Sliding Sync.
847    pub fn extensions_config(&self) -> http::request::Extensions {
848        let sticky = self.inner.sticky.read().unwrap();
849        sticky.data().extensions.clone()
850    }
851}
852
853#[derive(Clone, Debug)]
854pub(super) struct SlidingSyncPositionMarkers {
855    /// An ephemeral position in the current stream, as received from the
856    /// previous `/sync` response, or `None` for the first request.
857    pos: Option<String>,
858}
859
860#[derive(Serialize, Deserialize)]
861struct FrozenSlidingSyncPos {
862    #[serde(skip_serializing_if = "Option::is_none")]
863    pos: Option<String>,
864}
865
866/// A summary of the updates received after a sync (like in
867/// [`SlidingSync::sync`]).
868#[derive(Debug, Clone)]
869pub struct UpdateSummary {
870    /// The names of the lists that have seen an update.
871    pub lists: Vec<String>,
872    /// The rooms that have seen updates
873    pub rooms: Vec<OwnedRoomId>,
874}
875
876/// A very basic bool-ish enum to represent the state of a
877/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
878/// once should ideally not being sent again, to mostly save bandwidth.
879#[derive(Debug, Default)]
880enum RoomSubscriptionState {
881    /// The `RoomSubscription` has not been sent or received correctly from the
882    /// server, i.e. the `RoomSubscription` —which is part of the sticky
883    /// parameters— has not been committed.
884    #[default]
885    Pending,
886
887    /// The `RoomSubscription` has been sent and received correctly by the
888    /// server.
889    Applied,
890}
891
892/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
893/// sent in the request.
894#[derive(Debug)]
895pub(super) struct SlidingSyncStickyParameters {
896    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
897    /// but one wants to receive updates.
898    room_subscriptions:
899        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
900
901    /// The intended state of the extensions being supplied to sliding /sync
902    /// calls.
903    extensions: http::request::Extensions,
904}
905
906impl SlidingSyncStickyParameters {
907    /// Create a new set of sticky parameters.
908    pub fn new(
909        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
910        extensions: http::request::Extensions,
911    ) -> Self {
912        Self {
913            room_subscriptions: room_subscriptions
914                .into_iter()
915                .map(|(room_id, room_subscription)| {
916                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
917                })
918                .collect(),
919            extensions,
920        }
921    }
922}
923
924impl StickyData for SlidingSyncStickyParameters {
925    type Request = http::Request;
926
927    fn apply(&self, request: &mut Self::Request) {
928        request.room_subscriptions = self
929            .room_subscriptions
930            .iter()
931            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
932            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
933            .collect();
934        request.extensions = self.extensions.clone();
935    }
936
937    fn on_commit(&mut self) {
938        // All room subscriptions are marked as `Applied`.
939        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
940            if matches!(state, RoomSubscriptionState::Pending) {
941                *state = RoomSubscriptionState::Applied;
942            }
943        }
944    }
945}
946
947#[cfg(all(test, not(target_family = "wasm")))]
948#[allow(clippy::dbg_macro)]
949mod tests {
950    use std::{
951        collections::BTreeMap,
952        future::ready,
953        ops::Not,
954        sync::{Arc, Mutex},
955        time::Duration,
956    };
957
958    use assert_matches::assert_matches;
959    use event_listener::Listener;
960    use futures_util::{future::join_all, pin_mut, StreamExt};
961    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
962    use matrix_sdk_common::executor::spawn;
963    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
964    use ruma::{
965        api::client::error::ErrorKind,
966        assign,
967        events::{direct::DirectEvent, room::member::MembershipState},
968        owned_room_id, room_id,
969        serde::Raw,
970        uint, OwnedRoomId, TransactionId,
971    };
972    use serde::Deserialize;
973    use serde_json::json;
974    use wiremock::{
975        http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
976    };
977
978    use super::{
979        http,
980        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
981        SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
982        SlidingSyncStickyParameters,
983    };
984    use crate::{
985        sliding_sync::cache::restore_sliding_sync_state,
986        test_utils::{logged_in_client, mocks::MatrixMockServer},
987        Client, Result,
988    };
989
990    #[derive(Copy, Clone)]
991    struct SlidingSyncMatcher;
992
993    impl Match for SlidingSyncMatcher {
994        fn matches(&self, request: &Request) -> bool {
995            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
996                && request.method == Method::POST
997        }
998    }
999
1000    async fn new_sliding_sync(
1001        lists: Vec<SlidingSyncListBuilder>,
1002    ) -> Result<(MockServer, SlidingSync)> {
1003        let server = MockServer::start().await;
1004        let client = logged_in_client(Some(server.uri())).await;
1005
1006        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1007
1008        for list in lists {
1009            sliding_sync_builder = sliding_sync_builder.add_list(list);
1010        }
1011
1012        let sliding_sync = sliding_sync_builder.build().await?;
1013
1014        Ok((server, sliding_sync))
1015    }
1016
1017    #[async_test]
1018    async fn test_subscribe_to_rooms() -> Result<()> {
1019        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1020            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1021        .await?;
1022
1023        let stream = sliding_sync.sync();
1024        pin_mut!(stream);
1025
1026        let room_id_0 = room_id!("!r0:bar.org");
1027        let room_id_1 = room_id!("!r1:bar.org");
1028        let room_id_2 = room_id!("!r2:bar.org");
1029
1030        {
1031            let _mock_guard = Mock::given(SlidingSyncMatcher)
1032                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1033                    "pos": "1",
1034                    "lists": {},
1035                    "rooms": {
1036                        room_id_0: {
1037                            "name": "Room #0",
1038                            "initial": true,
1039                        },
1040                        room_id_1: {
1041                            "name": "Room #1",
1042                            "initial": true,
1043                        },
1044                        room_id_2: {
1045                            "name": "Room #2",
1046                            "initial": true,
1047                        },
1048                    }
1049                })))
1050                .mount_as_scoped(&server)
1051                .await;
1052
1053            let _ = stream.next().await.unwrap()?;
1054        }
1055
1056        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1057
1058        // Members aren't synced.
1059        // We need to make them synced, so that we can test that subscribing to a room
1060        // make members not synced. That's a desired feature.
1061        assert!(room0.are_members_synced().not());
1062
1063        {
1064            struct MemberMatcher(OwnedRoomId);
1065
1066            impl Match for MemberMatcher {
1067                fn matches(&self, request: &Request) -> bool {
1068                    request.url.path()
1069                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1070                        && request.method == Method::GET
1071                }
1072            }
1073
1074            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1075                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1076                    "chunk": [],
1077                })))
1078                .mount_as_scoped(&server)
1079                .await;
1080
1081            assert_matches!(room0.request_members().await, Ok(()));
1082        }
1083
1084        // Members are now synced! We can start subscribing and see how it goes.
1085        assert!(room0.are_members_synced());
1086
1087        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1088
1089        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1090        // now marked as not synced.
1091        assert!(room0.are_members_synced().not());
1092
1093        {
1094            let sticky = sliding_sync.inner.sticky.read().unwrap();
1095            let room_subscriptions = &sticky.data().room_subscriptions;
1096
1097            assert!(room_subscriptions.contains_key(room_id_0));
1098            assert!(room_subscriptions.contains_key(room_id_1));
1099            assert!(!room_subscriptions.contains_key(room_id_2));
1100        }
1101
1102        // Subscribing to the same room doesn't reset the member sync state.
1103
1104        {
1105            struct MemberMatcher(OwnedRoomId);
1106
1107            impl Match for MemberMatcher {
1108                fn matches(&self, request: &Request) -> bool {
1109                    request.url.path()
1110                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1111                        && request.method == Method::GET
1112                }
1113            }
1114
1115            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1116                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1117                    "chunk": [],
1118                })))
1119                .mount_as_scoped(&server)
1120                .await;
1121
1122            assert_matches!(room0.request_members().await, Ok(()));
1123        }
1124
1125        // Members are synced, good, good.
1126        assert!(room0.are_members_synced());
1127
1128        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1129
1130        // Members are still synced: because we have already subscribed to the
1131        // room, the members aren't marked as unsynced.
1132        assert!(room0.are_members_synced());
1133
1134        Ok(())
1135    }
1136
1137    #[async_test]
1138    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1139        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1140            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1141        .await?;
1142
1143        let room_id_0 = room_id!("!r0:bar.org");
1144        let room_id_1 = room_id!("!r1:bar.org");
1145        let room_id_2 = room_id!("!r2:bar.org");
1146
1147        // Subscribe to two rooms.
1148        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1149
1150        {
1151            let sticky = sliding_sync.inner.sticky.read().unwrap();
1152            let room_subscriptions = &sticky.data().room_subscriptions;
1153
1154            assert!(room_subscriptions.contains_key(room_id_0));
1155            assert!(room_subscriptions.contains_key(room_id_1));
1156            assert!(room_subscriptions.contains_key(room_id_2).not());
1157        }
1158
1159        // Subscribe to one more room.
1160        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1161
1162        {
1163            let sticky = sliding_sync.inner.sticky.read().unwrap();
1164            let room_subscriptions = &sticky.data().room_subscriptions;
1165
1166            assert!(room_subscriptions.contains_key(room_id_0));
1167            assert!(room_subscriptions.contains_key(room_id_1));
1168            assert!(room_subscriptions.contains_key(room_id_2));
1169        }
1170
1171        // Suddenly, the session expires!
1172        sliding_sync.expire_session().await;
1173
1174        {
1175            let sticky = sliding_sync.inner.sticky.read().unwrap();
1176            let room_subscriptions = &sticky.data().room_subscriptions;
1177
1178            assert!(room_subscriptions.is_empty());
1179        }
1180
1181        // Subscribe to one room again.
1182        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1183
1184        {
1185            let sticky = sliding_sync.inner.sticky.read().unwrap();
1186            let room_subscriptions = &sticky.data().room_subscriptions;
1187
1188            assert!(room_subscriptions.contains_key(room_id_0).not());
1189            assert!(room_subscriptions.contains_key(room_id_1).not());
1190            assert!(room_subscriptions.contains_key(room_id_2));
1191        }
1192
1193        Ok(())
1194    }
1195
1196    #[async_test]
1197    async fn test_add_list() -> Result<()> {
1198        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1199            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1200        .await?;
1201
1202        let _stream = sliding_sync.sync();
1203        pin_mut!(_stream);
1204
1205        sliding_sync
1206            .add_list(
1207                SlidingSyncList::builder("bar")
1208                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1209            )
1210            .await?;
1211
1212        let lists = sliding_sync.inner.lists.read().await;
1213
1214        assert!(lists.contains_key("foo"));
1215        assert!(lists.contains_key("bar"));
1216
1217        // this test also ensures that Tokio is not panicking when calling `add_list`.
1218
1219        Ok(())
1220    }
1221
1222    #[test]
1223    fn test_sticky_parameters_api_invalidated_flow() {
1224        let r0 = room_id!("!r0.matrix.org");
1225        let r1 = room_id!("!r1:matrix.org");
1226
1227        let mut room_subscriptions = BTreeMap::new();
1228        room_subscriptions.insert(r0.to_owned(), Default::default());
1229
1230        // At first it's invalidated.
1231        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1232            room_subscriptions,
1233            Default::default(),
1234        ));
1235        assert!(sticky.is_invalidated());
1236
1237        // Then when we create a request, the sticky parameters are applied.
1238        let txn_id: &TransactionId = "tid123".into();
1239
1240        let mut request = http::Request::default();
1241        request.txn_id = Some(txn_id.to_string());
1242
1243        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1244
1245        assert!(request.txn_id.is_some());
1246        assert_eq!(request.room_subscriptions.len(), 1);
1247        assert!(request.room_subscriptions.contains_key(r0));
1248
1249        let tid = request.txn_id.unwrap();
1250
1251        sticky.maybe_commit(tid.as_str().into());
1252        assert!(!sticky.is_invalidated());
1253
1254        // Applying new parameters will invalidate again.
1255        sticky
1256            .data_mut()
1257            .room_subscriptions
1258            .insert(r1.to_owned(), (Default::default(), Default::default()));
1259        assert!(sticky.is_invalidated());
1260
1261        // Committing with the wrong transaction id will keep it invalidated.
1262        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1263        assert!(sticky.is_invalidated());
1264
1265        // Restarting a request will only remember the last generated transaction id.
1266        let txn_id1: &TransactionId = "tid456".into();
1267        let mut request1 = http::Request::default();
1268        request1.txn_id = Some(txn_id1.to_string());
1269        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1270
1271        assert!(sticky.is_invalidated());
1272        // The first room subscription has been applied to `request`, so it's not
1273        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1274        // related to the sticky design.
1275        assert_eq!(request1.room_subscriptions.len(), 1);
1276        assert!(request1.room_subscriptions.contains_key(r1));
1277
1278        let txn_id2: &TransactionId = "tid789".into();
1279        let mut request2 = http::Request::default();
1280        request2.txn_id = Some(txn_id2.to_string());
1281
1282        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1283        assert!(sticky.is_invalidated());
1284        // `request2` contains `r1` because the sticky parameters have not been
1285        // committed, so it's still marked as pending.
1286        assert_eq!(request2.room_subscriptions.len(), 1);
1287        assert!(request2.room_subscriptions.contains_key(r1));
1288
1289        // Here we commit with the not most-recent TID, so it keeps the invalidated
1290        // status.
1291        sticky.maybe_commit(txn_id1);
1292        assert!(sticky.is_invalidated());
1293
1294        // But here we use the latest TID, so the commit is effective.
1295        sticky.maybe_commit(txn_id2);
1296        assert!(!sticky.is_invalidated());
1297    }
1298
1299    #[test]
1300    fn test_room_subscriptions_are_sticky() {
1301        let r0 = room_id!("!r0.matrix.org");
1302        let r1 = room_id!("!r1:matrix.org");
1303
1304        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1305            BTreeMap::new(),
1306            Default::default(),
1307        ));
1308
1309        // A room subscription is added, applied, and committed.
1310        {
1311            // Insert `r0`.
1312            sticky
1313                .data_mut()
1314                .room_subscriptions
1315                .insert(r0.to_owned(), (Default::default(), Default::default()));
1316
1317            // Then the sticky parameters are applied.
1318            let txn_id: &TransactionId = "tid0".into();
1319            let mut request = http::Request::default();
1320            request.txn_id = Some(txn_id.to_string());
1321
1322            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1323
1324            assert!(request.txn_id.is_some());
1325            assert_eq!(request.room_subscriptions.len(), 1);
1326            assert!(request.room_subscriptions.contains_key(r0));
1327
1328            // Then the sticky parameters are committed.
1329            let tid = request.txn_id.unwrap();
1330
1331            sticky.maybe_commit(tid.as_str().into());
1332        }
1333
1334        // A room subscription is added, applied, but NOT committed.
1335        {
1336            // Insert `r1`.
1337            sticky
1338                .data_mut()
1339                .room_subscriptions
1340                .insert(r1.to_owned(), (Default::default(), Default::default()));
1341
1342            // Then the sticky parameters are applied.
1343            let txn_id: &TransactionId = "tid1".into();
1344            let mut request = http::Request::default();
1345            request.txn_id = Some(txn_id.to_string());
1346
1347            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1348
1349            assert!(request.txn_id.is_some());
1350            assert_eq!(request.room_subscriptions.len(), 1);
1351            // `r0` is not present, it's only `r1`.
1352            assert!(request.room_subscriptions.contains_key(r1));
1353
1354            // Then the sticky parameters are NOT committed.
1355            // It can happen if the request has failed to be sent for example,
1356            // or if the response didn't match.
1357        }
1358
1359        // A previously added room subscription is re-added, applied, and committed.
1360        {
1361            // Then the sticky parameters are applied.
1362            let txn_id: &TransactionId = "tid2".into();
1363            let mut request = http::Request::default();
1364            request.txn_id = Some(txn_id.to_string());
1365
1366            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1367
1368            assert!(request.txn_id.is_some());
1369            assert_eq!(request.room_subscriptions.len(), 1);
1370            // `r0` is not present, it's only `r1`.
1371            assert!(request.room_subscriptions.contains_key(r1));
1372
1373            // Then the sticky parameters are committed.
1374            let tid = request.txn_id.unwrap();
1375
1376            sticky.maybe_commit(tid.as_str().into());
1377        }
1378
1379        // All room subscriptions have been committed.
1380        {
1381            // Then the sticky parameters are applied.
1382            let txn_id: &TransactionId = "tid3".into();
1383            let mut request = http::Request::default();
1384            request.txn_id = Some(txn_id.to_string());
1385
1386            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1387
1388            assert!(request.txn_id.is_some());
1389            // All room subscriptions have been sent.
1390            assert!(request.room_subscriptions.is_empty());
1391        }
1392    }
1393
1394    #[test]
1395    fn test_extensions_are_sticky() {
1396        let mut extensions = http::request::Extensions::default();
1397        extensions.account_data.enabled = Some(true);
1398
1399        // At first it's invalidated.
1400        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1401            Default::default(),
1402            extensions,
1403        ));
1404
1405        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1406
1407        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1408        // to-device.
1409        let extensions = &sticky.data().extensions;
1410        assert_eq!(extensions.e2ee.enabled, None);
1411        assert_eq!(extensions.to_device.enabled, None);
1412        assert_eq!(extensions.to_device.since, None);
1413
1414        // What the user explicitly enabled is… enabled.
1415        assert_eq!(extensions.account_data.enabled, Some(true));
1416
1417        let txn_id: &TransactionId = "tid123".into();
1418        let mut request = http::Request::default();
1419        request.txn_id = Some(txn_id.to_string());
1420        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1421        assert!(sticky.is_invalidated());
1422        assert_eq!(request.extensions.to_device.enabled, None);
1423        assert_eq!(request.extensions.to_device.since, None);
1424        assert_eq!(request.extensions.e2ee.enabled, None);
1425        assert_eq!(request.extensions.account_data.enabled, Some(true));
1426    }
1427
1428    #[async_test]
1429    async fn test_sticky_extensions_plus_since() -> Result<()> {
1430        let server = MockServer::start().await;
1431        let client = logged_in_client(Some(server.uri())).await;
1432
1433        let sync = client
1434            .sliding_sync("test-slidingsync")?
1435            .add_list(SlidingSyncList::builder("new_list"))
1436            .build()
1437            .await?;
1438
1439        // No extensions have been explicitly enabled here.
1440        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1441        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1442        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1443
1444        // Now enable e2ee and to-device.
1445        let sync = client
1446            .sliding_sync("test-slidingsync")?
1447            .add_list(SlidingSyncList::builder("new_list"))
1448            .with_to_device_extension(
1449                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1450            )
1451            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1452            .build()
1453            .await?;
1454
1455        // Even without a since token, the first request will contain the extensions
1456        // configuration, at least.
1457        let txn_id = TransactionId::new();
1458        let (request, _, _) = sync
1459            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1460            .await?;
1461
1462        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1463        assert_eq!(request.extensions.to_device.enabled, Some(true));
1464        assert!(request.extensions.to_device.since.is_none());
1465
1466        {
1467            // Committing with another transaction id doesn't validate anything.
1468            let mut sticky = sync.inner.sticky.write().unwrap();
1469            assert!(sticky.is_invalidated());
1470            sticky.maybe_commit(
1471                "hopefully the rng won't generate this very specific transaction id".into(),
1472            );
1473            assert!(sticky.is_invalidated());
1474        }
1475
1476        // Regenerating a request will yield the same one.
1477        let txn_id2 = TransactionId::new();
1478        let (request, _, _) = sync
1479            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1480            .await?;
1481
1482        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1483        assert_eq!(request.extensions.to_device.enabled, Some(true));
1484        assert!(request.extensions.to_device.since.is_none());
1485
1486        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1487
1488        {
1489            // Committing with the expected transaction id will validate it.
1490            let mut sticky = sync.inner.sticky.write().unwrap();
1491            assert!(sticky.is_invalidated());
1492            sticky.maybe_commit(txn_id2.as_str().into());
1493            assert!(!sticky.is_invalidated());
1494        }
1495
1496        // The next request should contain no sticky parameters.
1497        let txn_id = TransactionId::new();
1498        let (request, _, _) = sync
1499            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500            .await?;
1501        assert!(request.extensions.e2ee.enabled.is_none());
1502        assert!(request.extensions.to_device.enabled.is_none());
1503        assert!(request.extensions.to_device.since.is_none());
1504
1505        // If there's a to-device `since` token, we make sure we put the token
1506        // into the extension config. The rest doesn't need to be re-enabled due to
1507        // stickiness.
1508        let _since_token = "since";
1509
1510        #[cfg(feature = "e2e-encryption")]
1511        {
1512            use matrix_sdk_base::crypto::store::types::Changes;
1513            if let Some(olm_machine) = &*client.olm_machine().await {
1514                olm_machine
1515                    .store()
1516                    .save_changes(Changes {
1517                        next_batch_token: Some(_since_token.to_owned()),
1518                        ..Default::default()
1519                    })
1520                    .await?;
1521            }
1522        }
1523
1524        let txn_id = TransactionId::new();
1525        let (request, _, _) = sync
1526            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1527            .await?;
1528
1529        assert!(request.extensions.e2ee.enabled.is_none());
1530        assert!(request.extensions.to_device.enabled.is_none());
1531
1532        #[cfg(feature = "e2e-encryption")]
1533        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1534
1535        Ok(())
1536    }
1537
1538    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1539    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1540    // `/key/query` requests must be sent. See the code to see the details.
1541    //
1542    // This test is asserting that.
1543    #[async_test]
1544    #[cfg(feature = "e2e-encryption")]
1545    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1546        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1547        use matrix_sdk_test::ruma_response_from_json;
1548        use ruma::user_id;
1549
1550        let server = MockServer::start().await;
1551        let client = logged_in_client(Some(server.uri())).await;
1552
1553        let alice = user_id!("@alice:localhost");
1554        let bob = user_id!("@bob:localhost");
1555        let me = user_id!("@example:localhost");
1556
1557        // Track and mark users are not dirty, so that we can check they are “dirty”
1558        // after that. Dirty here means that a `/key/query` must be sent.
1559        {
1560            let olm_machine = client.olm_machine().await;
1561            let olm_machine = olm_machine.as_ref().unwrap();
1562
1563            olm_machine.update_tracked_users([alice, bob]).await?;
1564
1565            // Assert requests.
1566            let outgoing_requests = olm_machine.outgoing_requests().await?;
1567
1568            assert_eq!(outgoing_requests.len(), 2);
1569            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1570            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1571
1572            // Fake responses.
1573            olm_machine
1574                .mark_request_as_sent(
1575                    outgoing_requests[0].request_id(),
1576                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1577                        "one_time_key_counts": {}
1578                    }))),
1579                )
1580                .await?;
1581
1582            olm_machine
1583                .mark_request_as_sent(
1584                    outgoing_requests[1].request_id(),
1585                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1586                        "device_keys": {
1587                            alice: {},
1588                            bob: {},
1589                        }
1590                    }))),
1591                )
1592                .await?;
1593
1594            // Once more.
1595            let outgoing_requests = olm_machine.outgoing_requests().await?;
1596
1597            assert_eq!(outgoing_requests.len(), 1);
1598            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1599
1600            olm_machine
1601                .mark_request_as_sent(
1602                    outgoing_requests[0].request_id(),
1603                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1604                        "device_keys": {
1605                            me: {},
1606                        }
1607                    }))),
1608                )
1609                .await?;
1610
1611            // No more.
1612            let outgoing_requests = olm_machine.outgoing_requests().await?;
1613
1614            assert!(outgoing_requests.is_empty());
1615        }
1616
1617        let sync = client
1618            .sliding_sync("test-slidingsync")?
1619            .add_list(SlidingSyncList::builder("new_list"))
1620            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1621            .build()
1622            .await?;
1623
1624        // First request: no `pos`.
1625        let txn_id = TransactionId::new();
1626        let (_request, _, _) = sync
1627            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1628            .await?;
1629
1630        // Now, tracked users must be dirty.
1631        {
1632            let olm_machine = client.olm_machine().await;
1633            let olm_machine = olm_machine.as_ref().unwrap();
1634
1635            // Assert requests.
1636            let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638            assert_eq!(outgoing_requests.len(), 1);
1639            assert_matches!(
1640                outgoing_requests[0].request(),
1641                AnyOutgoingRequest::KeysQuery(request) => {
1642                    assert!(request.device_keys.contains_key(alice));
1643                    assert!(request.device_keys.contains_key(bob));
1644                    assert!(request.device_keys.contains_key(me));
1645                }
1646            );
1647
1648            // Fake responses.
1649            olm_machine
1650                .mark_request_as_sent(
1651                    outgoing_requests[0].request_id(),
1652                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1653                        "device_keys": {
1654                            alice: {},
1655                            bob: {},
1656                            me: {},
1657                        }
1658                    }))),
1659                )
1660                .await?;
1661        }
1662
1663        // Second request: with a `pos` this time.
1664        sync.set_pos("chocolat".to_owned()).await;
1665
1666        let txn_id = TransactionId::new();
1667        let (_request, _, _) = sync
1668            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1669            .await?;
1670
1671        // Tracked users are not marked as dirty.
1672        {
1673            let olm_machine = client.olm_machine().await;
1674            let olm_machine = olm_machine.as_ref().unwrap();
1675
1676            // Assert requests.
1677            let outgoing_requests = olm_machine.outgoing_requests().await?;
1678
1679            assert!(outgoing_requests.is_empty());
1680        }
1681
1682        Ok(())
1683    }
1684
1685    #[async_test]
1686    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1687        let server = MockServer::start().await;
1688        let client = logged_in_client(Some(server.uri())).await;
1689
1690        let sliding_sync = client
1691            .sliding_sync("test-slidingsync")?
1692            .with_to_device_extension(
1693                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1694            )
1695            .build()
1696            .await?;
1697
1698        // First request asks to enable the extension.
1699        let (request, _, _) =
1700            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1701        assert!(request.extensions.to_device.enabled.is_some());
1702
1703        let sync = sliding_sync.sync();
1704        pin_mut!(sync);
1705
1706        // `pos` is `None` to start with.
1707        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1708
1709        #[derive(Deserialize)]
1710        struct PartialRequest {
1711            txn_id: Option<String>,
1712        }
1713
1714        {
1715            let _mock_guard = Mock::given(SlidingSyncMatcher)
1716                .respond_with(|request: &Request| {
1717                    // Repeat the txn_id in the response, if set.
1718                    let request: PartialRequest = request.body_json().unwrap();
1719
1720                    ResponseTemplate::new(200).set_body_json(json!({
1721                        "txn_id": request.txn_id,
1722                        "pos": "0",
1723                    }))
1724                })
1725                .mount_as_scoped(&server)
1726                .await;
1727
1728            let next = sync.next().await;
1729            assert_matches!(next, Some(Ok(_update_summary)));
1730
1731            // `pos` has been updated.
1732            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1733        }
1734
1735        // Next request doesn't ask to enable the extension.
1736        let (request, _, _) =
1737            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1738        assert!(request.extensions.to_device.enabled.is_none());
1739
1740        // Next request is successful.
1741        {
1742            let _mock_guard = Mock::given(SlidingSyncMatcher)
1743                .respond_with(|request: &Request| {
1744                    // Repeat the txn_id in the response, if set.
1745                    let request: PartialRequest = request.body_json().unwrap();
1746
1747                    ResponseTemplate::new(200).set_body_json(json!({
1748                        "txn_id": request.txn_id,
1749                        "pos": "1",
1750                    }))
1751                })
1752                .mount_as_scoped(&server)
1753                .await;
1754
1755            let next = sync.next().await;
1756            assert_matches!(next, Some(Ok(_update_summary)));
1757
1758            // `pos` has been updated.
1759            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1760        }
1761
1762        // Next request is successful despite it receives an already
1763        // received `pos` from the server.
1764        {
1765            let _mock_guard = Mock::given(SlidingSyncMatcher)
1766                .respond_with(|request: &Request| {
1767                    // Repeat the txn_id in the response, if set.
1768                    let request: PartialRequest = request.body_json().unwrap();
1769
1770                    ResponseTemplate::new(200).set_body_json(json!({
1771                        "txn_id": request.txn_id,
1772                        "pos": "0", // <- already received!
1773                    }))
1774                })
1775                .up_to_n_times(1) // run this mock only once.
1776                .mount_as_scoped(&server)
1777                .await;
1778
1779            let next = sync.next().await;
1780            assert_matches!(next, Some(Ok(_update_summary)));
1781
1782            // `pos` has been updated.
1783            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1784        }
1785
1786        // Stop responding with successful requests!
1787        //
1788        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1789        // so they're reset. It also resets the `pos`.
1790        {
1791            let _mock_guard = Mock::given(SlidingSyncMatcher)
1792                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1793                    "error": "foo",
1794                    "errcode": "M_UNKNOWN_POS",
1795                })))
1796                .mount_as_scoped(&server)
1797                .await;
1798
1799            let next = sync.next().await;
1800
1801            // The expected error is returned.
1802            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1803
1804            // `pos` has been reset.
1805            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1806
1807            // Next request asks to enable the extension again.
1808            let (request, _, _) =
1809                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1810
1811            assert!(request.extensions.to_device.enabled.is_some());
1812
1813            // `sync` has been stopped.
1814            assert!(sync.next().await.is_none());
1815        }
1816
1817        Ok(())
1818    }
1819
1820    #[cfg(feature = "e2e-encryption")]
1821    #[async_test]
1822    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1823        let server = MockServer::start().await;
1824
1825        #[derive(Deserialize)]
1826        struct PartialRequest {
1827            txn_id: Option<String>,
1828        }
1829
1830        let server_pos = Arc::new(Mutex::new(0));
1831        let _mock_guard = Mock::given(SlidingSyncMatcher)
1832            .respond_with(move |request: &Request| {
1833                // Repeat the txn_id in the response, if set.
1834                let request: PartialRequest = request.body_json().unwrap();
1835                let pos = {
1836                    let mut pos = server_pos.lock().unwrap();
1837                    let prev = *pos;
1838                    *pos += 1;
1839                    prev
1840                };
1841
1842                ResponseTemplate::new(200).set_body_json(json!({
1843                    "txn_id": request.txn_id,
1844                    "pos": pos.to_string(),
1845                }))
1846            })
1847            .mount_as_scoped(&server)
1848            .await;
1849
1850        let client = logged_in_client(Some(server.uri())).await;
1851
1852        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1853
1854        // `pos` is `None` to start with.
1855        {
1856            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1857
1858            let (request, _, _) =
1859                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1860            assert!(request.pos.is_none());
1861        }
1862
1863        let sync = sliding_sync.sync();
1864        pin_mut!(sync);
1865
1866        // Sync goes well, and then the position is saved both into the internal memory
1867        // and the database.
1868        let next = sync.next().await;
1869        assert_matches!(next, Some(Ok(_update_summary)));
1870
1871        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1872
1873        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1874            .await?
1875            .expect("must have restored fields");
1876
1877        // While it has been saved into the database, it's not necessarily going to be
1878        // used later!
1879        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1880
1881        // Now, even if we mess with the position stored in the database, the sliding
1882        // sync instance isn't configured to reload the stream position from the
1883        // database, so it won't be changed.
1884        {
1885            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1886
1887            let mut position_guard = other_sync.inner.position.lock().await;
1888            position_guard.pos = Some("yolo".to_owned());
1889
1890            other_sync.cache_to_storage(&position_guard).await?;
1891        }
1892
1893        // It's still 0, not "yolo".
1894        {
1895            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896            let (request, _, _) =
1897                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1898            assert_eq!(request.pos.as_deref(), Some("0"));
1899        }
1900
1901        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1902        // asked to.
1903        {
1904            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1905            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1906        }
1907
1908        Ok(())
1909    }
1910
1911    #[cfg(feature = "e2e-encryption")]
1912    #[async_test]
1913    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1914        let server = MockServer::start().await;
1915
1916        #[derive(Deserialize)]
1917        struct PartialRequest {
1918            txn_id: Option<String>,
1919        }
1920
1921        let server_pos = Arc::new(Mutex::new(0));
1922        let _mock_guard = Mock::given(SlidingSyncMatcher)
1923            .respond_with(move |request: &Request| {
1924                // Repeat the txn_id in the response, if set.
1925                let request: PartialRequest = request.body_json().unwrap();
1926                let pos = {
1927                    let mut pos = server_pos.lock().unwrap();
1928                    let prev = *pos;
1929                    *pos += 1;
1930                    prev
1931                };
1932
1933                ResponseTemplate::new(200).set_body_json(json!({
1934                    "txn_id": request.txn_id,
1935                    "pos": pos.to_string(),
1936                }))
1937            })
1938            .mount_as_scoped(&server)
1939            .await;
1940
1941        let client = logged_in_client(Some(server.uri())).await;
1942
1943        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1944
1945        // `pos` is `None` to start with.
1946        {
1947            let (request, _, _) =
1948                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1949
1950            assert!(request.pos.is_none());
1951            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1952        }
1953
1954        let sync = sliding_sync.sync();
1955        pin_mut!(sync);
1956
1957        // Sync goes well, and then the position is saved both into the internal memory
1958        // and the database.
1959        let next = sync.next().await;
1960        assert_matches!(next, Some(Ok(_update_summary)));
1961
1962        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1963
1964        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1965            .await?
1966            .expect("must have restored fields");
1967
1968        // While it has been saved into the database, it's not necessarily going to be
1969        // used later!
1970        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1971
1972        // Another process modifies the stream position under our feet...
1973        {
1974            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1975
1976            let mut position_guard = other_sync.inner.position.lock().await;
1977            position_guard.pos = Some("42".to_owned());
1978
1979            other_sync.cache_to_storage(&position_guard).await?;
1980        }
1981
1982        // It's alright, the next request will load it from the database.
1983        {
1984            let (request, _, _) =
1985                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1986            assert_eq!(request.pos.as_deref(), Some("42"));
1987            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988        }
1989
1990        // Recreating a sliding sync with the same ID will reload it too.
1991        {
1992            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1993            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1994
1995            let (request, _, _) =
1996                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1997            assert_eq!(request.pos.as_deref(), Some("42"));
1998        }
1999
2000        // Invalidating the session will remove the in-memory value AND the database
2001        // value.
2002        sliding_sync.expire_session().await;
2003
2004        {
2005            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2006
2007            let (request, _, _) =
2008                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2009            assert!(request.pos.is_none());
2010        }
2011
2012        // And new sliding syncs with the same ID won't find it either.
2013        {
2014            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2015            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2016
2017            let (request, _, _) =
2018                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2019            assert!(request.pos.is_none());
2020        }
2021
2022        Ok(())
2023    }
2024
2025    #[async_test]
2026    async fn test_stop_sync_loop() -> Result<()> {
2027        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2028            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2029        .await?;
2030
2031        // Start the sync loop.
2032        let stream = sliding_sync.sync();
2033        pin_mut!(stream);
2034
2035        // The sync loop is actually running.
2036        assert!(stream.next().await.is_some());
2037
2038        // Stop the sync loop.
2039        sliding_sync.stop_sync()?;
2040
2041        // The sync loop is actually stopped.
2042        assert!(stream.next().await.is_none());
2043
2044        // Start a new sync loop.
2045        let stream = sliding_sync.sync();
2046        pin_mut!(stream);
2047
2048        // The sync loop is actually running.
2049        assert!(stream.next().await.is_some());
2050
2051        Ok(())
2052    }
2053
2054    #[async_test]
2055    async fn test_process_read_receipts() -> Result<()> {
2056        let room = owned_room_id!("!pony:example.org");
2057
2058        let server = MockServer::start().await;
2059        let client = logged_in_client(Some(server.uri())).await;
2060        client.event_cache().subscribe().unwrap();
2061
2062        let sliding_sync = client
2063            .sliding_sync("test")?
2064            .with_receipt_extension(
2065                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2066            )
2067            .add_list(
2068                SlidingSyncList::builder("all")
2069                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2070            )
2071            .build()
2072            .await?;
2073
2074        // Initial state.
2075        {
2076            let server_response = assign!(http::Response::new("0".to_owned()), {
2077                rooms: BTreeMap::from([(
2078                    room.clone(),
2079                    http::response::Room::default(),
2080                )])
2081            });
2082
2083            let _summary = {
2084                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2085                sliding_sync
2086                    .handle_response(
2087                        server_response.clone(),
2088                        &mut pos_guard,
2089                        RequestedRequiredStates::default(),
2090                    )
2091                    .await?
2092            };
2093        }
2094
2095        let server_response = assign!(http::Response::new("1".to_owned()), {
2096            extensions: assign!(http::response::Extensions::default(), {
2097                receipts: assign!(http::response::Receipts::default(), {
2098                    rooms: BTreeMap::from([
2099                        (
2100                            room.clone(),
2101                            Raw::from_json_string(
2102                                json!({
2103                                    "room_id": room,
2104                                    "type": "m.receipt",
2105                                    "content": {
2106                                        "$event:bar.org": {
2107                                            "m.read": {
2108                                                client.user_id().unwrap(): {
2109                                                    "ts": 1436451550,
2110                                                }
2111                                            }
2112                                        }
2113                                    }
2114                                })
2115                                .to_string(),
2116                            ).unwrap()
2117                        )
2118                    ])
2119                })
2120            })
2121        });
2122
2123        let summary = {
2124            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2125            sliding_sync
2126                .handle_response(
2127                    server_response.clone(),
2128                    &mut pos_guard,
2129                    RequestedRequiredStates::default(),
2130                )
2131                .await?
2132        };
2133
2134        assert!(summary.rooms.contains(&room));
2135
2136        Ok(())
2137    }
2138
2139    #[async_test]
2140    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2141        let room_id = owned_room_id!("!unicorn:example.org");
2142
2143        let server = MockServer::start().await;
2144        let client = logged_in_client(Some(server.uri())).await;
2145
2146        // Setup sliding sync with with one room and one list
2147
2148        let sliding_sync = client
2149            .sliding_sync("test")?
2150            .with_account_data_extension(
2151                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2152            )
2153            .add_list(
2154                SlidingSyncList::builder("all")
2155                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2156            )
2157            .build()
2158            .await?;
2159
2160        // Initial state.
2161        {
2162            let server_response = assign!(http::Response::new("0".to_owned()), {
2163                rooms: BTreeMap::from([(
2164                    room_id.clone(),
2165                    http::response::Room::default(),
2166                )])
2167            });
2168
2169            let _summary = {
2170                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2171                sliding_sync
2172                    .handle_response(
2173                        server_response.clone(),
2174                        &mut pos_guard,
2175                        RequestedRequiredStates::default(),
2176                    )
2177                    .await?
2178            };
2179        }
2180
2181        // Simulate a response that only changes the marked unread state of the room to
2182        // true
2183
2184        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2185
2186        let update_summary = {
2187            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2188            sliding_sync
2189                .handle_response(
2190                    server_response.clone(),
2191                    &mut pos_guard,
2192                    RequestedRequiredStates::default(),
2193                )
2194                .await?
2195        };
2196
2197        // Check that the list list and entry received the update
2198
2199        assert!(update_summary.rooms.contains(&room_id));
2200
2201        let room = client.get_room(&room_id).unwrap();
2202
2203        // Check the actual room data, this powers RoomInfo
2204
2205        assert!(room.is_marked_unread());
2206
2207        // Change it back to false and check if it updates
2208
2209        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2210
2211        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2212        sliding_sync
2213            .handle_response(
2214                server_response.clone(),
2215                &mut pos_guard,
2216                RequestedRequiredStates::default(),
2217            )
2218            .await?;
2219
2220        let room = client.get_room(&room_id).unwrap();
2221
2222        assert!(!room.is_marked_unread());
2223
2224        Ok(())
2225    }
2226
2227    fn make_mark_unread_response(
2228        response_number: &str,
2229        room_id: OwnedRoomId,
2230        unread: bool,
2231        add_rooms_section: bool,
2232    ) -> http::Response {
2233        let rooms = if add_rooms_section {
2234            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2235        } else {
2236            BTreeMap::new()
2237        };
2238
2239        let extensions = assign!(http::response::Extensions::default(), {
2240            account_data: assign!(http::response::AccountData::default(), {
2241                rooms: BTreeMap::from([
2242                    (
2243                        room_id,
2244                        vec![
2245                            Raw::from_json_string(
2246                                json!({
2247                                    "content": {
2248                                        "unread": unread
2249                                    },
2250                                    "type": "m.marked_unread"
2251                                })
2252                                .to_string(),
2253                            ).unwrap()
2254                        ]
2255                    )
2256                ])
2257            })
2258        });
2259
2260        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2261    }
2262
2263    #[async_test]
2264    async fn test_process_rooms_account_data() -> Result<()> {
2265        let room = owned_room_id!("!pony:example.org");
2266
2267        let server = MockServer::start().await;
2268        let client = logged_in_client(Some(server.uri())).await;
2269
2270        let sliding_sync = client
2271            .sliding_sync("test")?
2272            .with_account_data_extension(
2273                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2274            )
2275            .add_list(
2276                SlidingSyncList::builder("all")
2277                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2278            )
2279            .build()
2280            .await?;
2281
2282        // Initial state.
2283        {
2284            let server_response = assign!(http::Response::new("0".to_owned()), {
2285                rooms: BTreeMap::from([(
2286                    room.clone(),
2287                    http::response::Room::default(),
2288                )])
2289            });
2290
2291            let _summary = {
2292                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2293                sliding_sync
2294                    .handle_response(
2295                        server_response.clone(),
2296                        &mut pos_guard,
2297                        RequestedRequiredStates::default(),
2298                    )
2299                    .await?
2300            };
2301        }
2302
2303        let server_response = assign!(http::Response::new("1".to_owned()), {
2304            extensions: assign!(http::response::Extensions::default(), {
2305                account_data: assign!(http::response::AccountData::default(), {
2306                    rooms: BTreeMap::from([
2307                        (
2308                            room.clone(),
2309                            vec![
2310                                Raw::from_json_string(
2311                                    json!({
2312                                        "content": {
2313                                            "tags": {
2314                                                "u.work": {
2315                                                    "order": 0.9
2316                                                }
2317                                            }
2318                                        },
2319                                        "type": "m.tag"
2320                                    })
2321                                    .to_string(),
2322                                ).unwrap()
2323                            ]
2324                        )
2325                    ])
2326                })
2327            })
2328        });
2329        let summary = {
2330            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2331            sliding_sync
2332                .handle_response(
2333                    server_response.clone(),
2334                    &mut pos_guard,
2335                    RequestedRequiredStates::default(),
2336                )
2337                .await?
2338        };
2339
2340        assert!(summary.rooms.contains(&room));
2341
2342        Ok(())
2343    }
2344
2345    #[async_test]
2346    #[cfg(feature = "e2e-encryption")]
2347    async fn test_process_only_encryption_events() -> Result<()> {
2348        use ruma::OneTimeKeyAlgorithm;
2349
2350        let room = owned_room_id!("!croissant:example.org");
2351
2352        let server = MockServer::start().await;
2353        let client = logged_in_client(Some(server.uri())).await;
2354
2355        let server_response = assign!(http::Response::new("0".to_owned()), {
2356            rooms: BTreeMap::from([(
2357                room.clone(),
2358                assign!(http::response::Room::default(), {
2359                    name: Some("Croissants lovers".to_owned()),
2360                    timeline: Vec::new(),
2361                }),
2362            )]),
2363
2364            extensions: assign!(http::response::Extensions::default(), {
2365                e2ee: assign!(http::response::E2EE::default(), {
2366                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2367                }),
2368                to_device: Some(assign!(http::response::ToDevice::default(), {
2369                    next_batch: "to-device-token".to_owned(),
2370                })),
2371            })
2372        });
2373
2374        // Don't process non-encryption events if the sliding sync is configured for
2375        // encryption only.
2376
2377        let sliding_sync = client
2378            .sliding_sync("test")?
2379            .with_to_device_extension(
2380                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2381            )
2382            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2383            .build()
2384            .await?;
2385
2386        {
2387            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2388
2389            sliding_sync
2390                .handle_response(
2391                    server_response.clone(),
2392                    &mut position_guard,
2393                    RequestedRequiredStates::default(),
2394                )
2395                .await?;
2396        }
2397
2398        // E2EE has been properly handled.
2399        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2400        assert_eq!(uploaded_key_count, 42);
2401
2402        {
2403            let olm_machine = &*client.olm_machine_for_testing().await;
2404            assert_eq!(
2405                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2406                Some("to-device-token")
2407            );
2408        }
2409
2410        // Room events haven't.
2411        assert!(client.get_room(&room).is_none());
2412
2413        // Conversely, only process room lists events if the sliding sync was configured
2414        // as so.
2415        let client = logged_in_client(Some(server.uri())).await;
2416
2417        let sliding_sync = client
2418            .sliding_sync("test")?
2419            .add_list(SlidingSyncList::builder("thelist"))
2420            .build()
2421            .await?;
2422
2423        {
2424            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2425
2426            sliding_sync
2427                .handle_response(
2428                    server_response.clone(),
2429                    &mut position_guard,
2430                    RequestedRequiredStates::default(),
2431                )
2432                .await?;
2433        }
2434
2435        // E2EE response has been ignored.
2436        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2437        assert_eq!(uploaded_key_count, 0);
2438
2439        {
2440            let olm_machine = &*client.olm_machine_for_testing().await;
2441            assert_eq!(
2442                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2443                None
2444            );
2445        }
2446
2447        // The room is now known.
2448        assert!(client.get_room(&room).is_some());
2449
2450        // And it's also possible to set up both.
2451        let client = logged_in_client(Some(server.uri())).await;
2452
2453        let sliding_sync = client
2454            .sliding_sync("test")?
2455            .add_list(SlidingSyncList::builder("thelist"))
2456            .with_to_device_extension(
2457                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2458            )
2459            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2460            .build()
2461            .await?;
2462
2463        {
2464            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2465
2466            sliding_sync
2467                .handle_response(
2468                    server_response.clone(),
2469                    &mut position_guard,
2470                    RequestedRequiredStates::default(),
2471                )
2472                .await?;
2473        }
2474
2475        // E2EE has been properly handled.
2476        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2477        assert_eq!(uploaded_key_count, 42);
2478
2479        {
2480            let olm_machine = &*client.olm_machine_for_testing().await;
2481            assert_eq!(
2482                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2483                Some("to-device-token")
2484            );
2485        }
2486
2487        // The room is now known.
2488        assert!(client.get_room(&room).is_some());
2489
2490        Ok(())
2491    }
2492
2493    #[async_test]
2494    async fn test_lock_multiple_requests() -> Result<()> {
2495        let server = MockServer::start().await;
2496        let client = logged_in_client(Some(server.uri())).await;
2497
2498        let pos = Arc::new(Mutex::new(0));
2499        let _mock_guard = Mock::given(SlidingSyncMatcher)
2500            .respond_with(move |_: &Request| {
2501                let mut pos = pos.lock().unwrap();
2502                *pos += 1;
2503                ResponseTemplate::new(200).set_body_json(json!({
2504                    "pos": pos.to_string(),
2505                    "lists": {},
2506                    "rooms": {}
2507                }))
2508            })
2509            .mount_as_scoped(&server)
2510            .await;
2511
2512        let sliding_sync = client
2513            .sliding_sync("test")?
2514            .with_to_device_extension(
2515                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2516            )
2517            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2518            .build()
2519            .await?;
2520
2521        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2522        // test would never terminate.
2523        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2524
2525        for result in requests.await {
2526            result?;
2527        }
2528
2529        Ok(())
2530    }
2531
2532    #[async_test]
2533    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2534        let server = MockServer::start().await;
2535        let client = logged_in_client(Some(server.uri())).await;
2536
2537        let pos = Arc::new(Mutex::new(0));
2538        let _mock_guard = Mock::given(SlidingSyncMatcher)
2539            .respond_with(move |_: &Request| {
2540                let mut pos = pos.lock().unwrap();
2541                *pos += 1;
2542                // Respond slowly enough that we can skip one iteration.
2543                ResponseTemplate::new(200)
2544                    .set_body_json(json!({
2545                        "pos": pos.to_string(),
2546                        "lists": {},
2547                        "rooms": {}
2548                    }))
2549                    .set_delay(Duration::from_secs(2))
2550            })
2551            .mount_as_scoped(&server)
2552            .await;
2553
2554        let sliding_sync =
2555            client
2556                .sliding_sync("test")?
2557                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2558                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2559                ))
2560                .add_list(
2561                    SlidingSyncList::builder("another-list")
2562                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2563                )
2564                .build()
2565                .await?;
2566
2567        let stream = sliding_sync.sync();
2568        pin_mut!(stream);
2569
2570        let cloned_sync = sliding_sync.clone();
2571        spawn(async move {
2572            tokio::time::sleep(Duration::from_millis(100)).await;
2573
2574            cloned_sync
2575                .on_list("another-list", |list| {
2576                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2577                    ready(())
2578                })
2579                .await;
2580        });
2581
2582        assert_matches!(stream.next().await, Some(Ok(_)));
2583
2584        sliding_sync.stop_sync().unwrap();
2585
2586        assert_matches!(stream.next().await, None);
2587
2588        let mut num_requests = 0;
2589
2590        for request in server.received_requests().await.unwrap() {
2591            if !SlidingSyncMatcher.matches(&request) {
2592                continue;
2593            }
2594
2595            let another_list_ranges = if num_requests == 0 {
2596                // First request
2597                json!([[0, 10]])
2598            } else {
2599                // Second request
2600                json!([[10, 20]])
2601            };
2602
2603            num_requests += 1;
2604            assert!(num_requests <= 2, "more than one request hit the server");
2605
2606            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2607
2608            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2609                &json_value,
2610                &json!({
2611                    "conn_id": "test",
2612                    "lists": {
2613                        "room-list": {
2614                            "ranges": [[0, 9]],
2615                            "required_state": [
2616                                ["m.room.encryption", ""],
2617                                ["m.room.tombstone", ""]
2618                            ],
2619                        },
2620                        "another-list": {
2621                            "ranges": another_list_ranges,
2622                            "required_state": [
2623                                ["m.room.encryption", ""],
2624                                ["m.room.tombstone", ""]
2625                            ],
2626                        },
2627                    }
2628                }),
2629                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2630            ) {
2631                dbg!(json_value);
2632                panic!("json differ: {err}");
2633            }
2634        }
2635
2636        assert_eq!(num_requests, 2);
2637
2638        Ok(())
2639    }
2640
2641    #[async_test]
2642    async fn test_timeout_zero_list() -> Result<()> {
2643        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2644
2645        let (request, _, _) =
2646            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2647
2648        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2649        // on new update to pop.
2650        assert!(request.timeout.is_some());
2651
2652        Ok(())
2653    }
2654
2655    #[async_test]
2656    async fn test_timeout_one_list() -> Result<()> {
2657        let (_server, sliding_sync) = new_sliding_sync(vec![
2658            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2659        ])
2660        .await?;
2661
2662        let (request, _, _) =
2663            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2664
2665        // The list does not require a timeout.
2666        assert!(request.timeout.is_none());
2667
2668        // Simulate a response.
2669        {
2670            let server_response = assign!(http::Response::new("0".to_owned()), {
2671                lists: BTreeMap::from([(
2672                    "foo".to_owned(),
2673                    assign!(http::response::List::default(), {
2674                        count: uint!(7),
2675                    })
2676                 )])
2677            });
2678
2679            let _summary = {
2680                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2681                sliding_sync
2682                    .handle_response(
2683                        server_response.clone(),
2684                        &mut pos_guard,
2685                        RequestedRequiredStates::default(),
2686                    )
2687                    .await?
2688            };
2689        }
2690
2691        let (request, _, _) =
2692            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2693
2694        // The list is now fully loaded, so it requires a timeout.
2695        assert!(request.timeout.is_some());
2696
2697        Ok(())
2698    }
2699
2700    #[async_test]
2701    async fn test_timeout_three_lists() -> Result<()> {
2702        let (_server, sliding_sync) = new_sliding_sync(vec![
2703            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2704            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2705            SlidingSyncList::builder("baz")
2706                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2707        ])
2708        .await?;
2709
2710        let (request, _, _) =
2711            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2712
2713        // Two lists don't require a timeout.
2714        assert!(request.timeout.is_none());
2715
2716        // Simulate a response.
2717        {
2718            let server_response = assign!(http::Response::new("0".to_owned()), {
2719                lists: BTreeMap::from([(
2720                    "foo".to_owned(),
2721                    assign!(http::response::List::default(), {
2722                        count: uint!(7),
2723                    })
2724                 )])
2725            });
2726
2727            let _summary = {
2728                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2729                sliding_sync
2730                    .handle_response(
2731                        server_response.clone(),
2732                        &mut pos_guard,
2733                        RequestedRequiredStates::default(),
2734                    )
2735                    .await?
2736            };
2737        }
2738
2739        let (request, _, _) =
2740            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2741
2742        // One don't require a timeout.
2743        assert!(request.timeout.is_none());
2744
2745        // Simulate a response.
2746        {
2747            let server_response = assign!(http::Response::new("1".to_owned()), {
2748                lists: BTreeMap::from([(
2749                    "bar".to_owned(),
2750                    assign!(http::response::List::default(), {
2751                        count: uint!(7),
2752                    })
2753                 )])
2754            });
2755
2756            let _summary = {
2757                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2758                sliding_sync
2759                    .handle_response(
2760                        server_response.clone(),
2761                        &mut pos_guard,
2762                        RequestedRequiredStates::default(),
2763                    )
2764                    .await?
2765            };
2766        }
2767
2768        let (request, _, _) =
2769            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2770
2771        // All lists require a timeout.
2772        assert!(request.timeout.is_some());
2773
2774        Ok(())
2775    }
2776
2777    #[async_test]
2778    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2779        let server = MockServer::start().await;
2780        let client = logged_in_client(Some(server.uri())).await;
2781
2782        let _mock_guard = Mock::given(SlidingSyncMatcher)
2783            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2784                "pos": "0",
2785                "lists": {},
2786                "rooms": {}
2787            })))
2788            .mount_as_scoped(&server)
2789            .await;
2790
2791        let sliding_sync = client
2792            .sliding_sync("test")?
2793            .with_to_device_extension(
2794                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2795            )
2796            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2797            .build()
2798            .await?;
2799
2800        let sliding_sync = Arc::new(sliding_sync);
2801
2802        // Create the listener and perform a sync request
2803        let sync_beat_listener = client.inner.sync_beat.listen();
2804        sliding_sync.sync_once().await?;
2805
2806        // The sync beat listener should be notified shortly after
2807        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2808        Ok(())
2809    }
2810
2811    #[async_test]
2812    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2813        let server = MockServer::start().await;
2814        let client = logged_in_client(Some(server.uri())).await;
2815
2816        let _mock_guard = Mock::given(SlidingSyncMatcher)
2817            .respond_with(ResponseTemplate::new(404))
2818            .mount_as_scoped(&server)
2819            .await;
2820
2821        let sliding_sync = client
2822            .sliding_sync("test")?
2823            .with_to_device_extension(
2824                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2825            )
2826            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2827            .build()
2828            .await?;
2829
2830        let sliding_sync = Arc::new(sliding_sync);
2831
2832        // Create the listener and perform a sync request
2833        let sync_beat_listener = client.inner.sync_beat.listen();
2834        let sync_result = sliding_sync.sync_once().await;
2835        assert!(sync_result.is_err());
2836
2837        // The sync beat listener won't be notified in this case
2838        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2839
2840        Ok(())
2841    }
2842
2843    #[async_test]
2844    async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2845        let server = MatrixMockServer::new().await;
2846        let client = server.client_builder().build().await;
2847        let room_id = room_id!("!mu5hr00m:example.org");
2848
2849        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2850            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2851                "pos": "0",
2852                "lists": {},
2853                "extensions": {
2854                    "account_data": {
2855                        "global": [
2856                            {
2857                                "type": "m.direct",
2858                                "content": {
2859                                    "@de4dlockh0lmes:example.org": [
2860                                        "!mu5hr00m:example.org"
2861                                    ]
2862                                }
2863                            }
2864                        ]
2865                    }
2866                },
2867                "rooms": {
2868                    room_id: {
2869                        "name": "Mario Bros Fanbase Room",
2870                        "initial": true,
2871                    },
2872                }
2873            })))
2874            .mount_as_scoped(server.server())
2875            .await;
2876
2877        let f = EventFactory::new().room(room_id);
2878
2879        Mock::given(method("GET"))
2880            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2881            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2882                "chunk": [
2883                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2884                ]
2885            })))
2886            .mount(server.server())
2887            .await;
2888
2889        let (tx, rx) = tokio::sync::oneshot::channel();
2890
2891        let tx = Arc::new(Mutex::new(Some(tx)));
2892        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2893            // Try to run a /members query while in a event handler.
2894            let members =
2895                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2896            assert_eq!(members.len(), 1);
2897            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2898        });
2899
2900        let sliding_sync = client
2901            .sliding_sync("test")?
2902            .add_list(SlidingSyncList::builder("thelist"))
2903            .with_account_data_extension(
2904                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2905            )
2906            .build()
2907            .await?;
2908
2909        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2910            .await
2911            .expect("Sync did not complete in time")
2912            .expect("Sync failed");
2913
2914        // Wait for the event handler to complete.
2915        tokio::time::timeout(Duration::from_secs(5), rx)
2916            .await
2917            .expect("Event handler did not complete in time")
2918            .expect("Event handler failed");
2919
2920        Ok(())
2921    }
2922}