matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28    collections::{btree_map::Entry, BTreeMap, HashSet},
29    fmt::Debug,
30    future::Future,
31    sync::{Arc, RwLock as StdRwLock},
32    time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38pub use matrix_sdk_base::sliding_sync::http;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41    api::{client::error::ErrorKind, OutgoingRequest},
42    assign, OwnedEventId, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46    select,
47    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55    cache::restore_sliding_sync_state,
56    client::SlidingSyncResponseProcessor,
57    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, HttpError, Result};
60
61/// The Sliding Sync instance.
62///
63/// It is OK to clone this type as much as you need: cloning it is cheap.
64#[derive(Clone, Debug)]
65pub struct SlidingSync {
66    /// The Sliding Sync data.
67    inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72    /// A unique identifier for this instance of sliding sync.
73    ///
74    /// Used to distinguish different connections to the sliding sync proxy.
75    id: String,
76
77    /// Either an overridden sliding sync [`Version`], or one inherited from the
78    /// client.
79    version: Version,
80
81    /// The HTTP Matrix client.
82    client: Client,
83
84    /// Long-polling timeout that appears the sliding sync proxy request.
85    poll_timeout: Duration,
86
87    /// Extra duration for the sliding sync request to timeout. This is added to
88    /// the [`Self::proxy_timeout`].
89    network_timeout: Duration,
90
91    /// The storage key to keep this cache at and load it from.
92    storage_key: String,
93
94    /// Should this sliding sync instance try to restore its sync position
95    /// from the database?
96    ///
97    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
98    /// keep it even so, to avoid sparkling cfg statements everywhere
99    /// throughout this file.
100    share_pos: bool,
101
102    /// Position markers.
103    ///
104    /// The `pos` marker represents a progression when exchanging requests and
105    /// responses with the server: the server acknowledges the request by
106    /// responding with a new `pos`. If the client sends two non-necessarily
107    /// consecutive requests with the same `pos`, the server has to reply with
108    /// the same identical response.
109    ///
110    /// `position` is behind a mutex so that a new request starts after the
111    /// previous request trip has fully ended (successfully or not). This
112    /// mechanism exists to wait for the response to be handled and to see the
113    /// `position` being updated, before sending a new request.
114    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
115
116    /// The lists of this Sliding Sync instance.
117    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
118
119    /// All the rooms synced with Sliding Sync.
120    rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
121
122    /// Request parameters that are sticky.
123    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
124
125    /// Internal channel used to pass messages between Sliding Sync and other
126    /// types.
127    internal_channel: Sender<SlidingSyncInternalMessage>,
128}
129
130impl SlidingSync {
131    pub(super) fn new(inner: SlidingSyncInner) -> Self {
132        Self { inner: Arc::new(inner) }
133    }
134
135    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
136        cache::store_sliding_sync_state(self, position).await
137    }
138
139    /// Create a new [`SlidingSyncBuilder`].
140    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
141        SlidingSyncBuilder::new(id, client)
142    }
143
144    /// Subscribe to many rooms.
145    ///
146    /// If the associated `Room`s exist, it will be marked as
147    /// members are missing, so that it ensures to re-fetch all members.
148    ///
149    /// A subscription to an already subscribed room is ignored.
150    pub fn subscribe_to_rooms(
151        &self,
152        room_ids: &[&RoomId],
153        settings: Option<http::request::RoomSubscription>,
154        cancel_in_flight_request: bool,
155    ) {
156        let settings = settings.unwrap_or_default();
157        let mut sticky = self.inner.sticky.write().unwrap();
158        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
159
160        let mut skip_over_current_sync_loop_iteration = false;
161
162        for room_id in room_ids {
163            // If the room subscription already exists, let's not
164            // override it with a new one. First, it would reset its
165            // state (`RoomSubscriptionState`), and second it would try to
166            // re-subscribe with the next request. We don't want that. A room
167            // subscription should happen once, and next subscriptions should
168            // be ignored.
169            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
170                if let Some(room) = self.inner.client.get_room(room_id) {
171                    room.mark_members_missing();
172                }
173
174                entry.insert((RoomSubscriptionState::default(), settings.clone()));
175
176                skip_over_current_sync_loop_iteration = true;
177            }
178        }
179
180        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
181            self.inner.internal_channel_send_if_possible(
182                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
183            );
184        }
185    }
186
187    /// Lookup a specific room
188    pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
189        self.inner.rooms.read().await.get(room_id).cloned()
190    }
191
192    /// Check the number of rooms.
193    pub fn get_number_of_rooms(&self) -> usize {
194        self.inner.rooms.blocking_read().len()
195    }
196
197    /// Find a list by its name, and do something on it if it exists.
198    pub async fn on_list<Function, FunctionOutput, R>(
199        &self,
200        list_name: &str,
201        function: Function,
202    ) -> Option<R>
203    where
204        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
205        FunctionOutput: Future<Output = R>,
206    {
207        let lists = self.inner.lists.read().await;
208
209        match lists.get(list_name) {
210            Some(list) => Some(function(list).await),
211            None => None,
212        }
213    }
214
215    /// Add the list to the list of lists.
216    ///
217    /// As lists need to have a unique `.name`, if a list with the same name
218    /// is found the new list will replace the old one and the return it or
219    /// `None`.
220    pub async fn add_list(
221        &self,
222        list_builder: SlidingSyncListBuilder,
223    ) -> Result<Option<SlidingSyncList>> {
224        let list = list_builder.build(self.inner.internal_channel.clone());
225
226        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
227
228        self.inner.internal_channel_send_if_possible(
229            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
230        );
231
232        Ok(old_list)
233    }
234
235    /// Add a list that will be cached and reloaded from the cache.
236    ///
237    /// This will raise an error if a storage key was not set, or if there
238    /// was a I/O error reading from the cache.
239    ///
240    /// The rest of the semantics is the same as [`Self::add_list`].
241    pub async fn add_cached_list(
242        &self,
243        mut list_builder: SlidingSyncListBuilder,
244    ) -> Result<Option<SlidingSyncList>> {
245        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
246
247        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
248
249        self.add_list(list_builder).await
250    }
251
252    /// Lookup a set of rooms
253    pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
254        &self,
255        room_ids: I,
256    ) -> Vec<Option<SlidingSyncRoom>> {
257        let rooms = self.inner.rooms.read().await;
258
259        room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
260    }
261
262    /// Get all rooms.
263    pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
264        self.inner.rooms.read().await.values().cloned().collect()
265    }
266
267    /// Handle the HTTP response.
268    #[instrument(skip_all)]
269    async fn handle_response(
270        &self,
271        mut sliding_sync_response: http::Response,
272        position: &mut SlidingSyncPositionMarkers,
273    ) -> Result<UpdateSummary, crate::Error> {
274        let pos = Some(sliding_sync_response.pos.clone());
275
276        let must_process_rooms_response = self.must_process_rooms_response().await;
277
278        trace!(yes = must_process_rooms_response, "Must process rooms response?");
279
280        // Compute `limited` for the SS proxy only, if we're interested in a room list
281        // query.
282        if !self.inner.version.is_native() && must_process_rooms_response {
283            let known_rooms = self.inner.rooms.read().await;
284            compute_limited(&known_rooms, &mut sliding_sync_response.rooms);
285        }
286
287        // Transform a Sliding Sync Response to a `SyncResponse`.
288        //
289        // We may not need the `sync_response` in the future (once `SyncResponse` will
290        // move to Sliding Sync, i.e. to `http::Response`), but processing the
291        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
292        // happens here.
293
294        let mut sync_response = {
295            // Take the lock to avoid concurrent sliding syncs overwriting each other's room
296            // infos.
297            let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
298
299            let rooms = &*self.inner.rooms.read().await;
300            let mut response_processor =
301                SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
302
303            #[cfg(feature = "e2e-encryption")]
304            if self.is_e2ee_enabled() {
305                response_processor.handle_encryption(&sliding_sync_response.extensions).await?
306            }
307
308            // Only handle the room's subsection of the response, if this sliding sync was
309            // configured to do so. That's because even when not requesting it,
310            // sometimes the current (2023-07-20) proxy will forward room events
311            // unrelated to the current connection's parameters.
312            //
313            // NOTE: SS proxy workaround.
314            if must_process_rooms_response {
315                response_processor
316                    .handle_room_response(&sliding_sync_response, self.inner.version.is_native())
317                    .await?;
318            }
319
320            response_processor.process_and_take_response().await?
321        };
322
323        debug!(?sync_response, "Sliding Sync response has been handled by the client");
324
325        // Commit sticky parameters, if needed.
326        if let Some(ref txn_id) = sliding_sync_response.txn_id {
327            let txn_id = txn_id.as_str().into();
328            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
329            let mut lists = self.inner.lists.write().await;
330            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
331        }
332
333        let update_summary = {
334            // Update the rooms.
335            let updated_rooms = {
336                let mut rooms_map = self.inner.rooms.write().await;
337
338                let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
339
340                for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
341                    // `sync_response` contains the rooms with decrypted events if any, so look at
342                    // the timeline events here first if the room exists.
343                    // Otherwise, let's look at the timeline inside the `sliding_sync_response`.
344                    let timeline =
345                        if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
346                            joined_room.timeline.events
347                        } else {
348                            room_data.timeline.drain(..).map(TimelineEvent::new).collect()
349                        };
350
351                    match rooms_map.get_mut(&room_id) {
352                        // The room existed before, let's update it.
353                        Some(room) => {
354                            room.update(room_data, timeline);
355                        }
356
357                        // First time we need this room, let's create it.
358                        None => {
359                            rooms_map.insert(
360                                room_id.clone(),
361                                SlidingSyncRoom::new(
362                                    room_id.clone(),
363                                    room_data.prev_batch,
364                                    timeline,
365                                ),
366                            );
367                        }
368                    }
369
370                    updated_rooms.push(room_id);
371                }
372
373                // There might be other rooms that were only mentioned in the sliding sync
374                // extensions part of the response, and thus would result in rooms present in
375                // the `sync_response.join`. Mark them as updated too.
376                //
377                // Since we've removed rooms that were in the room subsection from
378                // `sync_response.rooms.join`, the remaining ones aren't already present in
379                // `updated_rooms` and wouldn't cause any duplicates.
380                updated_rooms.extend(sync_response.rooms.join.keys().cloned());
381
382                updated_rooms
383            };
384
385            // Update the lists.
386            let updated_lists = {
387                debug!(
388                    lists = ?sliding_sync_response.lists,
389                    "Update lists"
390                );
391
392                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
393                let mut lists = self.inner.lists.write().await;
394
395                // Iterate on known lists, not on lists in the response. Rooms may have been
396                // updated that were not involved in any list update.
397                for (name, list) in lists.iter_mut() {
398                    if let Some(updates) = sliding_sync_response.lists.get(name) {
399                        let maximum_number_of_rooms: u32 =
400                            updates.count.try_into().expect("failed to convert `count` to `u32`");
401
402                        if list.update(Some(maximum_number_of_rooms))? {
403                            updated_lists.push(name.clone());
404                        }
405                    } else if list.update(None)? {
406                        updated_lists.push(name.clone());
407                    }
408                }
409
410                // Report about unknown lists.
411                for name in sliding_sync_response.lists.keys() {
412                    if !lists.contains_key(name) {
413                        error!("Response for list `{name}` - unknown to us; skipping");
414                    }
415                }
416
417                updated_lists
418            };
419
420            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
421        };
422
423        // Everything went well, we can update the position markers.
424        //
425        // Save the new position markers.
426        position.pos = pos;
427
428        Ok(update_summary)
429    }
430
431    async fn generate_sync_request(
432        &self,
433        txn_id: &mut LazyTransactionId,
434    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
435        // Collect requests for lists.
436        let mut requests_lists = BTreeMap::new();
437
438        let require_timeout = {
439            let lists = self.inner.lists.read().await;
440
441            // Start at `true` in case there is zero list.
442            let mut require_timeout = true;
443
444            for (name, list) in lists.iter() {
445                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
446                require_timeout = require_timeout && list.requires_timeout();
447            }
448
449            require_timeout
450        };
451
452        // Collect the `pos`.
453        //
454        // Wait on the `position` mutex to be available. It means no request nor
455        // response is running. The `position` mutex is released whether the response
456        // has been fully handled successfully, in this case the `pos` is updated, or
457        // the response handling has failed, in this case the `pos` hasn't been updated
458        // and the same `pos` will be used for this new request.
459        let mut position_guard = self.inner.position.clone().lock_owned().await;
460
461        let to_device_enabled =
462            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
463
464        let restored_fields = if self.inner.share_pos || to_device_enabled {
465            let lists = self.inner.lists.read().await;
466            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
467        } else {
468            None
469        };
470
471        // Update pos: either the one restored from the database, if any and the sliding
472        // sync was configured so, or read it from the memory cache.
473        let pos = if self.inner.share_pos {
474            if let Some(fields) = &restored_fields {
475                // Override the memory one with the database one, for consistency.
476                if fields.pos != position_guard.pos {
477                    info!(
478                        "Pos from previous request ('{:?}') was different from \
479                         pos in database ('{:?}').",
480                        position_guard.pos, fields.pos
481                    );
482                    position_guard.pos = fields.pos.clone();
483                }
484                fields.pos.clone()
485            } else {
486                position_guard.pos.clone()
487            }
488        } else {
489            position_guard.pos.clone()
490        };
491
492        Span::current().record("pos", &pos);
493
494        // There is a non-negligible difference MSC3575 and MSC4186 in how
495        // the `e2ee` extension works. When the client sends a request with
496        // no `pos`:
497        //
498        // * MSC3575 returns all device lists updates since the last request from the
499        //   device that asked for device lists (this works similarly to to-device
500        //   message handling),
501        // * MSC4186 returns no device lists updates, as it only returns changes since
502        //   the provided `pos` (which is `null` in this case); this is in line with
503        //   sync v2.
504        //
505        // Therefore, with MSC4186, the device list cache must be marked as to be
506        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
507        // device lists updates that happened between the previous request and the new
508        // “initial” request.
509        #[cfg(feature = "e2e-encryption")]
510        if pos.is_none() && self.inner.version.is_native() && self.is_e2ee_enabled() {
511            info!("Marking all tracked users as dirty");
512
513            let olm_machine = self.inner.client.olm_machine().await;
514            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
515            olm_machine.mark_all_tracked_users_as_dirty().await?;
516        }
517
518        // Configure the timeout.
519        //
520        // The `timeout` query is necessary when all lists require it. Please see
521        // [`SlidingSyncList::requires_timeout`].
522        let timeout = require_timeout.then(|| self.inner.poll_timeout);
523
524        let mut request = assign!(http::Request::new(), {
525            conn_id: Some(self.inner.id.clone()),
526            pos,
527            timeout,
528            lists: requests_lists,
529        });
530
531        // Apply sticky parameters, if needs be.
532        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
533
534        // Extensions are now applied (via sticky parameters).
535        //
536        // Override the to-device token if the extension is enabled.
537        if to_device_enabled {
538            request.extensions.to_device.since =
539                restored_fields.and_then(|fields| fields.to_device_token);
540        }
541
542        // Apply the transaction id if one was generated.
543        if let Some(txn_id) = txn_id.get() {
544            request.txn_id = Some(txn_id.to_string());
545        }
546
547        Ok((
548            // The request itself.
549            request,
550            // Configure long-polling. We need some time for the long-poll itself,
551            // and extra time for the network delays.
552            RequestConfig::default()
553                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
554                .retry_limit(3),
555            position_guard,
556        ))
557    }
558
559    /// Send a sliding sync request.
560    ///
561    /// This method contains the sending logic. It takes a generic `Request`
562    /// because it can be an MSC4186 or an MSC3575 `Request`.
563    async fn send_sync_request<Request>(
564        &self,
565        request: Request,
566        request_config: RequestConfig,
567        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
568    ) -> Result<UpdateSummary>
569    where
570        Request: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
571        Request::IncomingResponse: Send
572            + Sync
573            +
574            // This is required to get back an MSC4186 `Response` whatever the
575            // `Request` type.
576            Into<http::Response>,
577        HttpError: From<ruma::api::error::FromHttpResponseError<Request::EndpointError>>,
578    {
579        debug!("Sending request");
580
581        // Prepare the request.
582        let request = self
583            .inner
584            .client
585            .send(request)
586            .with_request_config(request_config)
587            .with_homeserver_override(self.inner.version.overriding_url().map(ToString::to_string));
588
589        // Send the request and get a response with end-to-end encryption support.
590        //
591        // Sending the `/sync` request out when end-to-end encryption is enabled means
592        // that we need to also send out any outgoing e2ee related request out
593        // coming from the `OlmMachine::outgoing_requests()` method.
594
595        #[cfg(feature = "e2e-encryption")]
596        let response = {
597            if self.is_e2ee_enabled() {
598                // Here, we need to run 2 things:
599                //
600                // 1. Send the sliding sync request and get a response,
601                // 2. Send the E2EE requests.
602                //
603                // We don't want to use a `join` or `try_join` because we want to fail if and
604                // only if sending the sliding sync request fails. Failing to send the E2EE
605                // requests should just result in a log.
606                //
607                // We also want to give the priority to sliding sync request. E2EE requests are
608                // sent concurrently to the sliding sync request, but the priority is on waiting
609                // a sliding sync response.
610                //
611                // If sending sliding sync request fails, the sending of E2EE requests must be
612                // aborted as soon as possible.
613
614                let client = self.inner.client.clone();
615                let e2ee_uploads = spawn(async move {
616                    if let Err(error) = client.send_outgoing_requests().await {
617                        error!(?error, "Error while sending outgoing E2EE requests");
618                    }
619                })
620                // Ensure that the task is not running in detached mode. It is aborted when it's
621                // dropped.
622                .abort_on_drop();
623
624                // Wait on the sliding sync request success or failure early.
625                let response = request.await?;
626
627                // At this point, if `request` has been resolved successfully, we wait on
628                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
629                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
630                // been dropped, so aborted.
631                e2ee_uploads.await.map_err(|error| Error::JoinError {
632                    task_description: "e2ee_uploads".to_owned(),
633                    error,
634                })?;
635
636                response
637            } else {
638                request.await?
639            }
640        };
641
642        // Send the request and get a response _without_ end-to-end encryption support.
643        #[cfg(not(feature = "e2e-encryption"))]
644        let response = request.await?;
645
646        // The code manipulates `Request` and `Response` from MSC4186 because it's the
647        // future standard. But this function may have received a `Request` from MSC4186
648        // or MSC3575. We need to get back an MSC4186 `Response`.
649        let response = Into::<http::msc4186::Response>::into(response);
650
651        debug!("Received response");
652
653        // At this point, the request has been sent, and a response has been received.
654        //
655        // We must ensure the handling of the response cannot be stopped/
656        // cancelled. It must be done entirely, otherwise we can have
657        // corrupted/incomplete states for Sliding Sync and other parts of
658        // the code.
659        //
660        // That's why we are running the handling of the response in a spawned
661        // future that cannot be cancelled by anything.
662        let this = self.clone();
663
664        // Spawn a new future to ensure that the code inside this future cannot be
665        // cancelled if this method is cancelled.
666        let future = async move {
667            debug!("Start handling response");
668
669            // In case the task running this future is detached, we must
670            // ensure responses are handled one at a time. At this point we still own
671            // `position_guard`, so we're fine.
672
673            // Handle the response.
674            let updates = this.handle_response(response, &mut position_guard).await?;
675
676            this.cache_to_storage(&position_guard).await?;
677
678            // Release the position guard lock.
679            // It means that other responses can be generated and then handled later.
680            drop(position_guard);
681
682            debug!("Done handling response");
683
684            Ok(updates)
685        };
686
687        spawn(future.instrument(Span::current())).await.unwrap()
688    }
689
690    /// Is the e2ee extension enabled for this sliding sync instance?
691    #[cfg(feature = "e2e-encryption")]
692    fn is_e2ee_enabled(&self) -> bool {
693        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
694    }
695
696    #[cfg(not(feature = "e2e-encryption"))]
697    fn is_e2ee_enabled(&self) -> bool {
698        false
699    }
700
701    /// Should we process the room's subpart of a response?
702    async fn must_process_rooms_response(&self) -> bool {
703        // We consider that we must, if there's any room subscription or there's any
704        // list.
705        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
706            || !self.inner.lists.read().await.is_empty()
707    }
708
709    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
710    async fn sync_once(&self) -> Result<UpdateSummary> {
711        let (request, request_config, position_guard) =
712            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
713
714        // The code manipulates `Request` and `Response` from MSC4186 because it's
715        // the future standard (at the time of writing: 2024-09-09). Let's check if
716        // the generated request must be transformed into an MSC3575 `Request`.
717        let summaries = if !self.inner.version.is_native() {
718            self.send_sync_request(
719                Into::<http::msc3575::Request>::into(request),
720                request_config,
721                position_guard,
722            )
723            .await?
724        } else {
725            self.send_sync_request(request, request_config, position_guard).await?
726        };
727
728        // Notify a new sync was received
729        self.inner.client.inner.sync_beat.notify(usize::MAX);
730
731        Ok(summaries)
732    }
733
734    /// Create a _new_ Sliding Sync sync loop.
735    ///
736    /// This method returns a `Stream`, which will send requests and will handle
737    /// responses automatically. Lists and rooms are updated automatically.
738    ///
739    /// This function returns `Ok(…)` if everything went well, otherwise it will
740    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
741    /// termination.
742    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
743    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
744    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
745        debug!("Starting sync stream");
746
747        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
748
749        stream! {
750            loop {
751                debug!("Sync stream is running");
752
753                select! {
754                    biased;
755
756                    internal_message = internal_channel_receiver.recv() => {
757                        use SlidingSyncInternalMessage::*;
758
759                        debug!(?internal_message, "Sync stream has received an internal message");
760
761                        match internal_message {
762                            Err(_) | Ok(SyncLoopStop) => {
763                                break;
764                            }
765
766                            Ok(SyncLoopSkipOverCurrentIteration) => {
767                                continue;
768                            }
769                        }
770                    }
771
772                    update_summary = self.sync_once() => {
773                        match update_summary {
774                            Ok(updates) => {
775                                yield Ok(updates);
776                            }
777
778                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
779                            Err(error) => {
780                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
781                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
782                                    self.expire_session().await;
783                                }
784
785                                yield Err(error);
786
787                                // Terminates the loop, and terminates the stream.
788                                break;
789                            }
790                        }
791                    }
792                }
793            }
794
795            debug!("Sync stream has exited.");
796        }
797    }
798
799    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
800    ///
801    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
802    /// enough to “stop” it, but depending of how this `Stream` is used, it
803    /// might not be obvious to drop it immediately (thinking of using this API
804    /// over FFI; the foreign-language might not be able to drop a value
805    /// immediately). Thus, calling this method will ensure that the sync loop
806    /// stops gracefully and as soon as it returns.
807    pub fn stop_sync(&self) -> Result<()> {
808        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
809    }
810
811    /// Expire the current Sliding Sync session on the client-side.
812    ///
813    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
814    /// sticky parameters.
815    ///
816    /// This should only be used when it's clear that this session was about to
817    /// expire anyways, and should be used only in very specific cases (e.g.
818    /// multiple sliding syncs being run in parallel, and one of them has
819    /// expired).
820    ///
821    /// This method **MUST** be called when the sync loop is stopped.
822    #[doc(hidden)]
823    pub async fn expire_session(&self) {
824        info!("Session expired; resetting `pos` and sticky parameters");
825
826        {
827            let mut position = self.inner.position.lock().await;
828            position.pos = None;
829
830            if let Err(err) = self.cache_to_storage(&position).await {
831                error!(
832                    "couldn't invalidate sliding sync frozen state when expiring session: {err}"
833                );
834            }
835        }
836
837        {
838            let mut sticky = self.inner.sticky.write().unwrap();
839
840            // Clear all room subscriptions: we don't want to resend all room subscriptions
841            // when the session will restart.
842            sticky.data_mut().room_subscriptions.clear();
843        }
844
845        self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
846    }
847}
848
849impl SlidingSyncInner {
850    /// Send a message over the internal channel.
851    #[instrument]
852    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
853        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
854    }
855
856    /// Send a message over the internal channel if there is a receiver, i.e. if
857    /// the sync loop is running.
858    #[instrument]
859    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
860        // If there is no receiver, the send will fail, but that's OK here.
861        let _ = self.internal_channel.send(message);
862    }
863}
864
865#[derive(Copy, Clone, Debug, PartialEq)]
866enum SlidingSyncInternalMessage {
867    /// Instruct the sync loop to stop.
868    SyncLoopStop,
869
870    /// Instruct the sync loop to skip over any remaining work in its iteration,
871    /// and to jump to the next iteration.
872    SyncLoopSkipOverCurrentIteration,
873}
874
875#[cfg(any(test, feature = "testing"))]
876impl SlidingSync {
877    /// Set a new value for `pos`.
878    pub async fn set_pos(&self, new_pos: String) {
879        let mut position_lock = self.inner.position.lock().await;
880        position_lock.pos = Some(new_pos);
881    }
882
883    /// Get the sliding sync version used by this instance.
884    pub fn version(&self) -> &Version {
885        &self.inner.version
886    }
887
888    /// Read the static extension configuration for this Sliding Sync.
889    ///
890    /// Note: this is not the next content of the sticky parameters, but rightly
891    /// the static configuration that was set during creation of this
892    /// Sliding Sync.
893    pub fn extensions_config(&self) -> http::request::Extensions {
894        let sticky = self.inner.sticky.read().unwrap();
895        sticky.data().extensions.clone()
896    }
897}
898
899#[derive(Clone, Debug)]
900pub(super) struct SlidingSyncPositionMarkers {
901    /// An ephemeral position in the current stream, as received from the
902    /// previous `/sync` response, or `None` for the first request.
903    ///
904    /// Should not be persisted.
905    pos: Option<String>,
906}
907
908/// Frozen bits of a Sliding Sync that are stored in the *state* store.
909#[derive(Debug, Serialize, Deserialize)]
910struct FrozenSlidingSync {
911    /// Deprecated: prefer storing in the crypto store.
912    #[serde(skip_serializing_if = "Option::is_none")]
913    to_device_since: Option<String>,
914    #[serde(default, skip_serializing_if = "Vec::is_empty")]
915    rooms: Vec<FrozenSlidingSyncRoom>,
916}
917
918impl FrozenSlidingSync {
919    fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
920        // The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
921        Self {
922            to_device_since: None,
923            rooms: rooms
924                .iter()
925                .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
926                .collect::<Vec<_>>(),
927        }
928    }
929}
930
931#[derive(Serialize, Deserialize)]
932struct FrozenSlidingSyncPos {
933    #[serde(skip_serializing_if = "Option::is_none")]
934    pos: Option<String>,
935}
936
937/// A summary of the updates received after a sync (like in
938/// [`SlidingSync::sync`]).
939#[derive(Debug, Clone)]
940pub struct UpdateSummary {
941    /// The names of the lists that have seen an update.
942    pub lists: Vec<String>,
943    /// The rooms that have seen updates
944    pub rooms: Vec<OwnedRoomId>,
945}
946
947/// A very basic bool-ish enum to represent the state of a
948/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
949/// once should ideally not being sent again, to mostly save bandwidth.
950#[derive(Debug, Default)]
951enum RoomSubscriptionState {
952    /// The `RoomSubscription` has not been sent or received correctly from the
953    /// server, i.e. the `RoomSubscription` —which is part of the sticky
954    /// parameters— has not been committed.
955    #[default]
956    Pending,
957
958    /// The `RoomSubscription` has been sent and received correctly by the
959    /// server.
960    Applied,
961}
962
963/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
964/// sent in the request.
965#[derive(Debug)]
966pub(super) struct SlidingSyncStickyParameters {
967    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
968    /// but one wants to receive updates.
969    room_subscriptions:
970        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
971
972    /// The intended state of the extensions being supplied to sliding /sync
973    /// calls.
974    extensions: http::request::Extensions,
975}
976
977impl SlidingSyncStickyParameters {
978    /// Create a new set of sticky parameters.
979    pub fn new(
980        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
981        extensions: http::request::Extensions,
982    ) -> Self {
983        Self {
984            room_subscriptions: room_subscriptions
985                .into_iter()
986                .map(|(room_id, room_subscription)| {
987                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
988                })
989                .collect(),
990            extensions,
991        }
992    }
993}
994
995impl StickyData for SlidingSyncStickyParameters {
996    type Request = http::Request;
997
998    fn apply(&self, request: &mut Self::Request) {
999        request.room_subscriptions = self
1000            .room_subscriptions
1001            .iter()
1002            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
1003            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
1004            .collect();
1005        request.extensions = self.extensions.clone();
1006    }
1007
1008    fn on_commit(&mut self) {
1009        // All room subscriptions are marked as `Applied`.
1010        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
1011            if matches!(state, RoomSubscriptionState::Pending) {
1012                *state = RoomSubscriptionState::Applied;
1013            }
1014        }
1015    }
1016}
1017
1018/// As of 2023-07-13, the sliding sync proxy doesn't provide us with `limited`
1019/// correctly, so we cheat and "correct" it using heuristics here.
1020/// TODO remove this workaround as soon as support of the `limited` flag is
1021/// properly implemented in the open-source proxy: https://github.com/matrix-org/sliding-sync/issues/197
1022// NOTE: SS proxy workaround.
1023fn compute_limited(
1024    local_rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>,
1025    remote_rooms: &mut BTreeMap<OwnedRoomId, http::response::Room>,
1026) {
1027    for (room_id, remote_room) in remote_rooms {
1028        // Only rooms marked as initially loaded are subject to the fixup.
1029        let initial = remote_room.initial.unwrap_or(false);
1030        if !initial {
1031            continue;
1032        }
1033
1034        if remote_room.limited {
1035            // If the room was already marked as limited, the server knew more than we do.
1036            continue;
1037        }
1038
1039        let remote_events = &remote_room.timeline;
1040        if remote_events.is_empty() {
1041            trace!(?room_id, "no timeline updates in the response => not limited");
1042            continue;
1043        }
1044
1045        let Some(local_room) = local_rooms.get(room_id) else {
1046            trace!(?room_id, "room isn't known locally => not limited");
1047            continue;
1048        };
1049
1050        let local_events = local_room.timeline_queue();
1051
1052        if local_events.is_empty() {
1053            trace!(?room_id, "local timeline had no events => not limited");
1054            continue;
1055        }
1056
1057        // If the local room had some timeline events, consider it's a `limited` if
1058        // there's absolutely no overlap between the known events and the new
1059        // events in the timeline.
1060
1061        // Gather all the known event IDs. Ignore events that don't have an event ID.
1062        let num_local_events = local_events.len();
1063        let local_events_with_ids: HashSet<OwnedEventId> =
1064            HashSet::from_iter(local_events.into_iter().filter_map(|event| event.event_id()));
1065
1066        // There's overlap if, and only if, there's at least one event in the response's
1067        // timeline that matches an event id we've seen before.
1068        let mut num_remote_events_missing_ids = 0;
1069        let overlap = remote_events.iter().any(|remote_event| {
1070            if let Some(remote_event_id) =
1071                remote_event.get_field::<OwnedEventId>("event_id").ok().flatten()
1072            {
1073                local_events_with_ids.contains(&remote_event_id)
1074            } else {
1075                num_remote_events_missing_ids += 1;
1076                false
1077            }
1078        });
1079
1080        remote_room.limited = !overlap;
1081
1082        trace!(
1083            ?room_id,
1084            num_events_response = remote_events.len(),
1085            num_local_events,
1086            num_local_events_with_ids = local_events_with_ids.len(),
1087            num_remote_events_missing_ids,
1088            room_limited = remote_room.limited,
1089            "done"
1090        );
1091    }
1092}
1093
1094#[cfg(all(test, not(target_family = "wasm")))]
1095#[allow(clippy::dbg_macro)]
1096mod tests {
1097    use std::{
1098        collections::BTreeMap,
1099        future::ready,
1100        ops::Not,
1101        sync::{Arc, Mutex},
1102        time::Duration,
1103    };
1104
1105    use assert_matches::assert_matches;
1106    use event_listener::Listener;
1107    use futures_util::{future::join_all, pin_mut, StreamExt};
1108    use matrix_sdk_common::deserialized_responses::TimelineEvent;
1109    use matrix_sdk_test::async_test;
1110    use ruma::{
1111        api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
1112        OwnedRoomId, TransactionId,
1113    };
1114    use serde::Deserialize;
1115    use serde_json::json;
1116    use url::Url;
1117    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
1118
1119    use super::{
1120        compute_limited, http,
1121        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
1122        FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
1123        SlidingSyncRoom, SlidingSyncStickyParameters, Version,
1124    };
1125    use crate::{
1126        sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
1127    };
1128
1129    #[derive(Copy, Clone)]
1130    struct SlidingSyncMatcher;
1131
1132    impl Match for SlidingSyncMatcher {
1133        fn matches(&self, request: &Request) -> bool {
1134            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1135                && request.method == Method::POST
1136        }
1137    }
1138
1139    async fn new_sliding_sync(
1140        lists: Vec<SlidingSyncListBuilder>,
1141    ) -> Result<(MockServer, SlidingSync)> {
1142        let server = MockServer::start().await;
1143        let client = logged_in_client(Some(server.uri())).await;
1144
1145        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1146
1147        for list in lists {
1148            sliding_sync_builder = sliding_sync_builder.add_list(list);
1149        }
1150
1151        let sliding_sync = sliding_sync_builder.build().await?;
1152
1153        Ok((server, sliding_sync))
1154    }
1155
1156    #[async_test]
1157    async fn test_subscribe_to_rooms() -> Result<()> {
1158        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1159            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1160        .await?;
1161
1162        let stream = sliding_sync.sync();
1163        pin_mut!(stream);
1164
1165        let room_id_0 = room_id!("!r0:bar.org");
1166        let room_id_1 = room_id!("!r1:bar.org");
1167        let room_id_2 = room_id!("!r2:bar.org");
1168
1169        {
1170            let _mock_guard = Mock::given(SlidingSyncMatcher)
1171                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1172                    "pos": "1",
1173                    "lists": {},
1174                    "rooms": {
1175                        room_id_0: {
1176                            "name": "Room #0",
1177                            "initial": true,
1178                        },
1179                        room_id_1: {
1180                            "name": "Room #1",
1181                            "initial": true,
1182                        },
1183                        room_id_2: {
1184                            "name": "Room #2",
1185                            "initial": true,
1186                        },
1187                    }
1188                })))
1189                .mount_as_scoped(&server)
1190                .await;
1191
1192            let _ = stream.next().await.unwrap()?;
1193        }
1194
1195        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1196
1197        // Members aren't synced.
1198        // We need to make them synced, so that we can test that subscribing to a room
1199        // make members not synced. That's a desired feature.
1200        assert!(room0.are_members_synced().not());
1201
1202        {
1203            struct MemberMatcher(OwnedRoomId);
1204
1205            impl Match for MemberMatcher {
1206                fn matches(&self, request: &Request) -> bool {
1207                    request.url.path()
1208                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1209                        && request.method == Method::GET
1210                }
1211            }
1212
1213            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1214                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1215                    "chunk": [],
1216                })))
1217                .mount_as_scoped(&server)
1218                .await;
1219
1220            assert_matches!(room0.request_members().await, Ok(()));
1221        }
1222
1223        // Members are now synced! We can start subscribing and see how it goes.
1224        assert!(room0.are_members_synced());
1225
1226        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1227
1228        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1229        // now marked as not synced.
1230        assert!(room0.are_members_synced().not());
1231
1232        {
1233            let sticky = sliding_sync.inner.sticky.read().unwrap();
1234            let room_subscriptions = &sticky.data().room_subscriptions;
1235
1236            assert!(room_subscriptions.contains_key(room_id_0));
1237            assert!(room_subscriptions.contains_key(room_id_1));
1238            assert!(!room_subscriptions.contains_key(room_id_2));
1239        }
1240
1241        // Subscribing to the same room doesn't reset the member sync state.
1242
1243        {
1244            struct MemberMatcher(OwnedRoomId);
1245
1246            impl Match for MemberMatcher {
1247                fn matches(&self, request: &Request) -> bool {
1248                    request.url.path()
1249                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1250                        && request.method == Method::GET
1251                }
1252            }
1253
1254            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1255                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1256                    "chunk": [],
1257                })))
1258                .mount_as_scoped(&server)
1259                .await;
1260
1261            assert_matches!(room0.request_members().await, Ok(()));
1262        }
1263
1264        // Members are synced, good, good.
1265        assert!(room0.are_members_synced());
1266
1267        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1268
1269        // Members are still synced: because we have already subscribed to the
1270        // room, the members aren't marked as unsynced.
1271        assert!(room0.are_members_synced());
1272
1273        Ok(())
1274    }
1275
1276    #[async_test]
1277    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1278        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1279            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1280        .await?;
1281
1282        let room_id_0 = room_id!("!r0:bar.org");
1283        let room_id_1 = room_id!("!r1:bar.org");
1284        let room_id_2 = room_id!("!r2:bar.org");
1285
1286        // Subscribe to two rooms.
1287        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1288
1289        {
1290            let sticky = sliding_sync.inner.sticky.read().unwrap();
1291            let room_subscriptions = &sticky.data().room_subscriptions;
1292
1293            assert!(room_subscriptions.contains_key(room_id_0));
1294            assert!(room_subscriptions.contains_key(room_id_1));
1295            assert!(room_subscriptions.contains_key(room_id_2).not());
1296        }
1297
1298        // Subscribe to one more room.
1299        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1300
1301        {
1302            let sticky = sliding_sync.inner.sticky.read().unwrap();
1303            let room_subscriptions = &sticky.data().room_subscriptions;
1304
1305            assert!(room_subscriptions.contains_key(room_id_0));
1306            assert!(room_subscriptions.contains_key(room_id_1));
1307            assert!(room_subscriptions.contains_key(room_id_2));
1308        }
1309
1310        // Suddenly, the session expires!
1311        sliding_sync.expire_session().await;
1312
1313        {
1314            let sticky = sliding_sync.inner.sticky.read().unwrap();
1315            let room_subscriptions = &sticky.data().room_subscriptions;
1316
1317            assert!(room_subscriptions.is_empty());
1318        }
1319
1320        // Subscribe to one room again.
1321        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1322
1323        {
1324            let sticky = sliding_sync.inner.sticky.read().unwrap();
1325            let room_subscriptions = &sticky.data().room_subscriptions;
1326
1327            assert!(room_subscriptions.contains_key(room_id_0).not());
1328            assert!(room_subscriptions.contains_key(room_id_1).not());
1329            assert!(room_subscriptions.contains_key(room_id_2));
1330        }
1331
1332        Ok(())
1333    }
1334
1335    #[async_test]
1336    async fn test_to_device_token_properly_cached() -> Result<()> {
1337        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1338            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1339        .await?;
1340
1341        // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
1342        // in the crypto store since PR #2323.
1343        let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1344        assert!(frozen.to_device_since.is_none());
1345
1346        Ok(())
1347    }
1348
1349    #[async_test]
1350    async fn test_add_list() -> Result<()> {
1351        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1352            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1353        .await?;
1354
1355        let _stream = sliding_sync.sync();
1356        pin_mut!(_stream);
1357
1358        sliding_sync
1359            .add_list(
1360                SlidingSyncList::builder("bar")
1361                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1362            )
1363            .await?;
1364
1365        let lists = sliding_sync.inner.lists.read().await;
1366
1367        assert!(lists.contains_key("foo"));
1368        assert!(lists.contains_key("bar"));
1369
1370        // this test also ensures that Tokio is not panicking when calling `add_list`.
1371
1372        Ok(())
1373    }
1374
1375    #[test]
1376    fn test_sticky_parameters_api_invalidated_flow() {
1377        let r0 = room_id!("!r0.matrix.org");
1378        let r1 = room_id!("!r1:matrix.org");
1379
1380        let mut room_subscriptions = BTreeMap::new();
1381        room_subscriptions.insert(r0.to_owned(), Default::default());
1382
1383        // At first it's invalidated.
1384        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1385            room_subscriptions,
1386            Default::default(),
1387        ));
1388        assert!(sticky.is_invalidated());
1389
1390        // Then when we create a request, the sticky parameters are applied.
1391        let txn_id: &TransactionId = "tid123".into();
1392
1393        let mut request = http::Request::default();
1394        request.txn_id = Some(txn_id.to_string());
1395
1396        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1397
1398        assert!(request.txn_id.is_some());
1399        assert_eq!(request.room_subscriptions.len(), 1);
1400        assert!(request.room_subscriptions.contains_key(r0));
1401
1402        let tid = request.txn_id.unwrap();
1403
1404        sticky.maybe_commit(tid.as_str().into());
1405        assert!(!sticky.is_invalidated());
1406
1407        // Applying new parameters will invalidate again.
1408        sticky
1409            .data_mut()
1410            .room_subscriptions
1411            .insert(r1.to_owned(), (Default::default(), Default::default()));
1412        assert!(sticky.is_invalidated());
1413
1414        // Committing with the wrong transaction id will keep it invalidated.
1415        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1416        assert!(sticky.is_invalidated());
1417
1418        // Restarting a request will only remember the last generated transaction id.
1419        let txn_id1: &TransactionId = "tid456".into();
1420        let mut request1 = http::Request::default();
1421        request1.txn_id = Some(txn_id1.to_string());
1422        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1423
1424        assert!(sticky.is_invalidated());
1425        // The first room subscription has been applied to `request`, so it's not
1426        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1427        // related to the sticky design.
1428        assert_eq!(request1.room_subscriptions.len(), 1);
1429        assert!(request1.room_subscriptions.contains_key(r1));
1430
1431        let txn_id2: &TransactionId = "tid789".into();
1432        let mut request2 = http::Request::default();
1433        request2.txn_id = Some(txn_id2.to_string());
1434
1435        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1436        assert!(sticky.is_invalidated());
1437        // `request2` contains `r1` because the sticky parameters have not been
1438        // committed, so it's still marked as pending.
1439        assert_eq!(request2.room_subscriptions.len(), 1);
1440        assert!(request2.room_subscriptions.contains_key(r1));
1441
1442        // Here we commit with the not most-recent TID, so it keeps the invalidated
1443        // status.
1444        sticky.maybe_commit(txn_id1);
1445        assert!(sticky.is_invalidated());
1446
1447        // But here we use the latest TID, so the commit is effective.
1448        sticky.maybe_commit(txn_id2);
1449        assert!(!sticky.is_invalidated());
1450    }
1451
1452    #[test]
1453    fn test_room_subscriptions_are_sticky() {
1454        let r0 = room_id!("!r0.matrix.org");
1455        let r1 = room_id!("!r1:matrix.org");
1456
1457        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1458            BTreeMap::new(),
1459            Default::default(),
1460        ));
1461
1462        // A room subscription is added, applied, and committed.
1463        {
1464            // Insert `r0`.
1465            sticky
1466                .data_mut()
1467                .room_subscriptions
1468                .insert(r0.to_owned(), (Default::default(), Default::default()));
1469
1470            // Then the sticky parameters are applied.
1471            let txn_id: &TransactionId = "tid0".into();
1472            let mut request = http::Request::default();
1473            request.txn_id = Some(txn_id.to_string());
1474
1475            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1476
1477            assert!(request.txn_id.is_some());
1478            assert_eq!(request.room_subscriptions.len(), 1);
1479            assert!(request.room_subscriptions.contains_key(r0));
1480
1481            // Then the sticky parameters are committed.
1482            let tid = request.txn_id.unwrap();
1483
1484            sticky.maybe_commit(tid.as_str().into());
1485        }
1486
1487        // A room subscription is added, applied, but NOT committed.
1488        {
1489            // Insert `r1`.
1490            sticky
1491                .data_mut()
1492                .room_subscriptions
1493                .insert(r1.to_owned(), (Default::default(), Default::default()));
1494
1495            // Then the sticky parameters are applied.
1496            let txn_id: &TransactionId = "tid1".into();
1497            let mut request = http::Request::default();
1498            request.txn_id = Some(txn_id.to_string());
1499
1500            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1501
1502            assert!(request.txn_id.is_some());
1503            assert_eq!(request.room_subscriptions.len(), 1);
1504            // `r0` is not present, it's only `r1`.
1505            assert!(request.room_subscriptions.contains_key(r1));
1506
1507            // Then the sticky parameters are NOT committed.
1508            // It can happen if the request has failed to be sent for example,
1509            // or if the response didn't match.
1510        }
1511
1512        // A previously added room subscription is re-added, applied, and committed.
1513        {
1514            // Then the sticky parameters are applied.
1515            let txn_id: &TransactionId = "tid2".into();
1516            let mut request = http::Request::default();
1517            request.txn_id = Some(txn_id.to_string());
1518
1519            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1520
1521            assert!(request.txn_id.is_some());
1522            assert_eq!(request.room_subscriptions.len(), 1);
1523            // `r0` is not present, it's only `r1`.
1524            assert!(request.room_subscriptions.contains_key(r1));
1525
1526            // Then the sticky parameters are committed.
1527            let tid = request.txn_id.unwrap();
1528
1529            sticky.maybe_commit(tid.as_str().into());
1530        }
1531
1532        // All room subscriptions have been committed.
1533        {
1534            // Then the sticky parameters are applied.
1535            let txn_id: &TransactionId = "tid3".into();
1536            let mut request = http::Request::default();
1537            request.txn_id = Some(txn_id.to_string());
1538
1539            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1540
1541            assert!(request.txn_id.is_some());
1542            // All room subscriptions have been sent.
1543            assert!(request.room_subscriptions.is_empty());
1544        }
1545    }
1546
1547    #[test]
1548    fn test_extensions_are_sticky() {
1549        let mut extensions = http::request::Extensions::default();
1550        extensions.account_data.enabled = Some(true);
1551
1552        // At first it's invalidated.
1553        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1554            Default::default(),
1555            extensions,
1556        ));
1557
1558        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1559
1560        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1561        // to-device.
1562        let extensions = &sticky.data().extensions;
1563        assert_eq!(extensions.e2ee.enabled, None);
1564        assert_eq!(extensions.to_device.enabled, None);
1565        assert_eq!(extensions.to_device.since, None);
1566
1567        // What the user explicitly enabled is… enabled.
1568        assert_eq!(extensions.account_data.enabled, Some(true));
1569
1570        let txn_id: &TransactionId = "tid123".into();
1571        let mut request = http::Request::default();
1572        request.txn_id = Some(txn_id.to_string());
1573        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1574        assert!(sticky.is_invalidated());
1575        assert_eq!(request.extensions.to_device.enabled, None);
1576        assert_eq!(request.extensions.to_device.since, None);
1577        assert_eq!(request.extensions.e2ee.enabled, None);
1578        assert_eq!(request.extensions.account_data.enabled, Some(true));
1579    }
1580
1581    #[async_test]
1582    async fn test_sticky_extensions_plus_since() -> Result<()> {
1583        let server = MockServer::start().await;
1584        let client = logged_in_client(Some(server.uri())).await;
1585
1586        let sync = client
1587            .sliding_sync("test-slidingsync")?
1588            .add_list(SlidingSyncList::builder("new_list"))
1589            .build()
1590            .await?;
1591
1592        // No extensions have been explicitly enabled here.
1593        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1594        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1595        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1596
1597        // Now enable e2ee and to-device.
1598        let sync = client
1599            .sliding_sync("test-slidingsync")?
1600            .add_list(SlidingSyncList::builder("new_list"))
1601            .with_to_device_extension(
1602                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1603            )
1604            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1605            .build()
1606            .await?;
1607
1608        // Even without a since token, the first request will contain the extensions
1609        // configuration, at least.
1610        let txn_id = TransactionId::new();
1611        let (request, _, _) = sync
1612            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1613            .await?;
1614
1615        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1616        assert_eq!(request.extensions.to_device.enabled, Some(true));
1617        assert!(request.extensions.to_device.since.is_none());
1618
1619        {
1620            // Committing with another transaction id doesn't validate anything.
1621            let mut sticky = sync.inner.sticky.write().unwrap();
1622            assert!(sticky.is_invalidated());
1623            sticky.maybe_commit(
1624                "hopefully the rng won't generate this very specific transaction id".into(),
1625            );
1626            assert!(sticky.is_invalidated());
1627        }
1628
1629        // Regenerating a request will yield the same one.
1630        let txn_id2 = TransactionId::new();
1631        let (request, _, _) = sync
1632            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1633            .await?;
1634
1635        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1636        assert_eq!(request.extensions.to_device.enabled, Some(true));
1637        assert!(request.extensions.to_device.since.is_none());
1638
1639        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1640
1641        {
1642            // Committing with the expected transaction id will validate it.
1643            let mut sticky = sync.inner.sticky.write().unwrap();
1644            assert!(sticky.is_invalidated());
1645            sticky.maybe_commit(txn_id2.as_str().into());
1646            assert!(!sticky.is_invalidated());
1647        }
1648
1649        // The next request should contain no sticky parameters.
1650        let txn_id = TransactionId::new();
1651        let (request, _, _) = sync
1652            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1653            .await?;
1654        assert!(request.extensions.e2ee.enabled.is_none());
1655        assert!(request.extensions.to_device.enabled.is_none());
1656        assert!(request.extensions.to_device.since.is_none());
1657
1658        // If there's a to-device `since` token, we make sure we put the token
1659        // into the extension config. The rest doesn't need to be re-enabled due to
1660        // stickiness.
1661        let _since_token = "since";
1662
1663        #[cfg(feature = "e2e-encryption")]
1664        {
1665            use matrix_sdk_base::crypto::store::Changes;
1666            if let Some(olm_machine) = &*client.olm_machine().await {
1667                olm_machine
1668                    .store()
1669                    .save_changes(Changes {
1670                        next_batch_token: Some(_since_token.to_owned()),
1671                        ..Default::default()
1672                    })
1673                    .await?;
1674            }
1675        }
1676
1677        let txn_id = TransactionId::new();
1678        let (request, _, _) = sync
1679            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1680            .await?;
1681
1682        assert!(request.extensions.e2ee.enabled.is_none());
1683        assert!(request.extensions.to_device.enabled.is_none());
1684
1685        #[cfg(feature = "e2e-encryption")]
1686        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1687
1688        Ok(())
1689    }
1690
1691    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1692    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1693    // `/key/query` requests must be sent. See the code to see the details.
1694    //
1695    // This test is asserting that.
1696    #[async_test]
1697    #[cfg(feature = "e2e-encryption")]
1698    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1699        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1700        use matrix_sdk_test::ruma_response_from_json;
1701        use ruma::user_id;
1702
1703        let server = MockServer::start().await;
1704        let client = logged_in_client(Some(server.uri())).await;
1705
1706        let alice = user_id!("@alice:localhost");
1707        let bob = user_id!("@bob:localhost");
1708        let me = user_id!("@example:localhost");
1709
1710        // Track and mark users are not dirty, so that we can check they are “dirty”
1711        // after that. Dirty here means that a `/key/query` must be sent.
1712        {
1713            let olm_machine = client.olm_machine().await;
1714            let olm_machine = olm_machine.as_ref().unwrap();
1715
1716            olm_machine.update_tracked_users([alice, bob]).await?;
1717
1718            // Assert requests.
1719            let outgoing_requests = olm_machine.outgoing_requests().await?;
1720
1721            assert_eq!(outgoing_requests.len(), 2);
1722            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1723            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1724
1725            // Fake responses.
1726            olm_machine
1727                .mark_request_as_sent(
1728                    outgoing_requests[0].request_id(),
1729                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1730                        "one_time_key_counts": {}
1731                    }))),
1732                )
1733                .await?;
1734
1735            olm_machine
1736                .mark_request_as_sent(
1737                    outgoing_requests[1].request_id(),
1738                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1739                        "device_keys": {
1740                            alice: {},
1741                            bob: {},
1742                        }
1743                    }))),
1744                )
1745                .await?;
1746
1747            // Once more.
1748            let outgoing_requests = olm_machine.outgoing_requests().await?;
1749
1750            assert_eq!(outgoing_requests.len(), 1);
1751            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1752
1753            olm_machine
1754                .mark_request_as_sent(
1755                    outgoing_requests[0].request_id(),
1756                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1757                        "device_keys": {
1758                            me: {},
1759                        }
1760                    }))),
1761                )
1762                .await?;
1763
1764            // No more.
1765            let outgoing_requests = olm_machine.outgoing_requests().await?;
1766
1767            assert!(outgoing_requests.is_empty());
1768        }
1769
1770        let sync = client
1771            .sliding_sync("test-slidingsync")?
1772            .add_list(SlidingSyncList::builder("new_list"))
1773            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1774            .build()
1775            .await?;
1776
1777        // First request: no `pos`.
1778        let txn_id = TransactionId::new();
1779        let (_request, _, _) = sync
1780            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1781            .await?;
1782
1783        // Now, tracked users must be dirty.
1784        {
1785            let olm_machine = client.olm_machine().await;
1786            let olm_machine = olm_machine.as_ref().unwrap();
1787
1788            // Assert requests.
1789            let outgoing_requests = olm_machine.outgoing_requests().await?;
1790
1791            assert_eq!(outgoing_requests.len(), 1);
1792            assert_matches!(
1793                outgoing_requests[0].request(),
1794                AnyOutgoingRequest::KeysQuery(request) => {
1795                    assert!(request.device_keys.contains_key(alice));
1796                    assert!(request.device_keys.contains_key(bob));
1797                    assert!(request.device_keys.contains_key(me));
1798                }
1799            );
1800
1801            // Fake responses.
1802            olm_machine
1803                .mark_request_as_sent(
1804                    outgoing_requests[0].request_id(),
1805                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1806                        "device_keys": {
1807                            alice: {},
1808                            bob: {},
1809                            me: {},
1810                        }
1811                    }))),
1812                )
1813                .await?;
1814        }
1815
1816        // Second request: with a `pos` this time.
1817        sync.set_pos("chocolat".to_owned()).await;
1818
1819        let txn_id = TransactionId::new();
1820        let (_request, _, _) = sync
1821            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1822            .await?;
1823
1824        // Tracked users are not marked as dirty.
1825        {
1826            let olm_machine = client.olm_machine().await;
1827            let olm_machine = olm_machine.as_ref().unwrap();
1828
1829            // Assert requests.
1830            let outgoing_requests = olm_machine.outgoing_requests().await?;
1831
1832            assert!(outgoing_requests.is_empty());
1833        }
1834
1835        Ok(())
1836    }
1837
1838    #[async_test]
1839    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1840        let server = MockServer::start().await;
1841        let client = logged_in_client(Some(server.uri())).await;
1842
1843        let sliding_sync = client
1844            .sliding_sync("test-slidingsync")?
1845            .with_to_device_extension(
1846                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1847            )
1848            .build()
1849            .await?;
1850
1851        // First request asks to enable the extension.
1852        let (request, _, _) =
1853            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1854        assert!(request.extensions.to_device.enabled.is_some());
1855
1856        let sync = sliding_sync.sync();
1857        pin_mut!(sync);
1858
1859        // `pos` is `None` to start with.
1860        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1861
1862        #[derive(Deserialize)]
1863        struct PartialRequest {
1864            txn_id: Option<String>,
1865        }
1866
1867        {
1868            let _mock_guard = Mock::given(SlidingSyncMatcher)
1869                .respond_with(|request: &Request| {
1870                    // Repeat the txn_id in the response, if set.
1871                    let request: PartialRequest = request.body_json().unwrap();
1872
1873                    ResponseTemplate::new(200).set_body_json(json!({
1874                        "txn_id": request.txn_id,
1875                        "pos": "0",
1876                    }))
1877                })
1878                .mount_as_scoped(&server)
1879                .await;
1880
1881            let next = sync.next().await;
1882            assert_matches!(next, Some(Ok(_update_summary)));
1883
1884            // `pos` has been updated.
1885            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1886        }
1887
1888        // Next request doesn't ask to enable the extension.
1889        let (request, _, _) =
1890            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1891        assert!(request.extensions.to_device.enabled.is_none());
1892
1893        // Next request is successful.
1894        {
1895            let _mock_guard = Mock::given(SlidingSyncMatcher)
1896                .respond_with(|request: &Request| {
1897                    // Repeat the txn_id in the response, if set.
1898                    let request: PartialRequest = request.body_json().unwrap();
1899
1900                    ResponseTemplate::new(200).set_body_json(json!({
1901                        "txn_id": request.txn_id,
1902                        "pos": "1",
1903                    }))
1904                })
1905                .mount_as_scoped(&server)
1906                .await;
1907
1908            let next = sync.next().await;
1909            assert_matches!(next, Some(Ok(_update_summary)));
1910
1911            // `pos` has been updated.
1912            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1913        }
1914
1915        // Next request is successful despite it receives an already
1916        // received `pos` from the server.
1917        {
1918            let _mock_guard = Mock::given(SlidingSyncMatcher)
1919                .respond_with(|request: &Request| {
1920                    // Repeat the txn_id in the response, if set.
1921                    let request: PartialRequest = request.body_json().unwrap();
1922
1923                    ResponseTemplate::new(200).set_body_json(json!({
1924                        "txn_id": request.txn_id,
1925                        "pos": "0", // <- already received!
1926                    }))
1927                })
1928                .up_to_n_times(1) // run this mock only once.
1929                .mount_as_scoped(&server)
1930                .await;
1931
1932            let next = sync.next().await;
1933            assert_matches!(next, Some(Ok(_update_summary)));
1934
1935            // `pos` has been updated.
1936            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1937        }
1938
1939        // Stop responding with successful requests!
1940        //
1941        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1942        // so they're reset. It also resets the `pos`.
1943        {
1944            let _mock_guard = Mock::given(SlidingSyncMatcher)
1945                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1946                    "error": "foo",
1947                    "errcode": "M_UNKNOWN_POS",
1948                })))
1949                .mount_as_scoped(&server)
1950                .await;
1951
1952            let next = sync.next().await;
1953
1954            // The expected error is returned.
1955            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1956
1957            // `pos` has been reset.
1958            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1959
1960            // Next request asks to enable the extension again.
1961            let (request, _, _) =
1962                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1963
1964            assert!(request.extensions.to_device.enabled.is_some());
1965
1966            // `sync` has been stopped.
1967            assert!(sync.next().await.is_none());
1968        }
1969
1970        Ok(())
1971    }
1972
1973    #[cfg(feature = "e2e-encryption")]
1974    #[async_test]
1975    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1976        let server = MockServer::start().await;
1977
1978        #[derive(Deserialize)]
1979        struct PartialRequest {
1980            txn_id: Option<String>,
1981        }
1982
1983        let server_pos = Arc::new(Mutex::new(0));
1984        let _mock_guard = Mock::given(SlidingSyncMatcher)
1985            .respond_with(move |request: &Request| {
1986                // Repeat the txn_id in the response, if set.
1987                let request: PartialRequest = request.body_json().unwrap();
1988                let pos = {
1989                    let mut pos = server_pos.lock().unwrap();
1990                    let prev = *pos;
1991                    *pos += 1;
1992                    prev
1993                };
1994
1995                ResponseTemplate::new(200).set_body_json(json!({
1996                    "txn_id": request.txn_id,
1997                    "pos": pos.to_string(),
1998                }))
1999            })
2000            .mount_as_scoped(&server)
2001            .await;
2002
2003        let client = logged_in_client(Some(server.uri())).await;
2004
2005        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2006
2007        // `pos` is `None` to start with.
2008        {
2009            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2010
2011            let (request, _, _) =
2012                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2013            assert!(request.pos.is_none());
2014        }
2015
2016        let sync = sliding_sync.sync();
2017        pin_mut!(sync);
2018
2019        // Sync goes well, and then the position is saved both into the internal memory
2020        // and the database.
2021        let next = sync.next().await;
2022        assert_matches!(next, Some(Ok(_update_summary)));
2023
2024        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
2025
2026        let restored_fields = restore_sliding_sync_state(
2027            &client,
2028            &sliding_sync.inner.storage_key,
2029            &*sliding_sync.inner.lists.read().await,
2030        )
2031        .await?
2032        .expect("must have restored fields");
2033
2034        // While it has been saved into the database, it's not necessarily going to be
2035        // used later!
2036        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2037
2038        // Now, even if we mess with the position stored in the database, the sliding
2039        // sync instance isn't configured to reload the stream position from the
2040        // database, so it won't be changed.
2041        {
2042            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2043
2044            let mut position_guard = other_sync.inner.position.lock().await;
2045            position_guard.pos = Some("yolo".to_owned());
2046
2047            other_sync.cache_to_storage(&position_guard).await?;
2048        }
2049
2050        // It's still 0, not "yolo".
2051        {
2052            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
2053            let (request, _, _) =
2054                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2055            assert_eq!(request.pos.as_deref(), Some("0"));
2056        }
2057
2058        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
2059        // asked to.
2060        {
2061            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
2062            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2063        }
2064
2065        Ok(())
2066    }
2067
2068    #[cfg(feature = "e2e-encryption")]
2069    #[async_test]
2070    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
2071        let server = MockServer::start().await;
2072
2073        #[derive(Deserialize)]
2074        struct PartialRequest {
2075            txn_id: Option<String>,
2076        }
2077
2078        let server_pos = Arc::new(Mutex::new(0));
2079        let _mock_guard = Mock::given(SlidingSyncMatcher)
2080            .respond_with(move |request: &Request| {
2081                // Repeat the txn_id in the response, if set.
2082                let request: PartialRequest = request.body_json().unwrap();
2083                let pos = {
2084                    let mut pos = server_pos.lock().unwrap();
2085                    let prev = *pos;
2086                    *pos += 1;
2087                    prev
2088                };
2089
2090                ResponseTemplate::new(200).set_body_json(json!({
2091                    "txn_id": request.txn_id,
2092                    "pos": pos.to_string(),
2093                }))
2094            })
2095            .mount_as_scoped(&server)
2096            .await;
2097
2098        let client = logged_in_client(Some(server.uri())).await;
2099
2100        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2101
2102        // `pos` is `None` to start with.
2103        {
2104            let (request, _, _) =
2105                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2106
2107            assert!(request.pos.is_none());
2108            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2109        }
2110
2111        let sync = sliding_sync.sync();
2112        pin_mut!(sync);
2113
2114        // Sync goes well, and then the position is saved both into the internal memory
2115        // and the database.
2116        let next = sync.next().await;
2117        assert_matches!(next, Some(Ok(_update_summary)));
2118
2119        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
2120
2121        let restored_fields = restore_sliding_sync_state(
2122            &client,
2123            &sliding_sync.inner.storage_key,
2124            &*sliding_sync.inner.lists.read().await,
2125        )
2126        .await?
2127        .expect("must have restored fields");
2128
2129        // While it has been saved into the database, it's not necessarily going to be
2130        // used later!
2131        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2132
2133        // Another process modifies the stream position under our feet...
2134        {
2135            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2136
2137            let mut position_guard = other_sync.inner.position.lock().await;
2138            position_guard.pos = Some("42".to_owned());
2139
2140            other_sync.cache_to_storage(&position_guard).await?;
2141        }
2142
2143        // It's alright, the next request will load it from the database.
2144        {
2145            let (request, _, _) =
2146                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2147            assert_eq!(request.pos.as_deref(), Some("42"));
2148            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2149        }
2150
2151        // Recreating a sliding sync with the same ID will reload it too.
2152        {
2153            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2154            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2155
2156            let (request, _, _) =
2157                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2158            assert_eq!(request.pos.as_deref(), Some("42"));
2159        }
2160
2161        // Invalidating the session will remove the in-memory value AND the database
2162        // value.
2163        sliding_sync.expire_session().await;
2164
2165        {
2166            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2167
2168            let (request, _, _) =
2169                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2170            assert!(request.pos.is_none());
2171        }
2172
2173        // And new sliding syncs with the same ID won't find it either.
2174        {
2175            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2176            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2177
2178            let (request, _, _) =
2179                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2180            assert!(request.pos.is_none());
2181        }
2182
2183        Ok(())
2184    }
2185
2186    #[async_test]
2187    async fn test_stop_sync_loop() -> Result<()> {
2188        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2189            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2190        .await?;
2191
2192        // Start the sync loop.
2193        let stream = sliding_sync.sync();
2194        pin_mut!(stream);
2195
2196        // The sync loop is actually running.
2197        assert!(stream.next().await.is_some());
2198
2199        // Stop the sync loop.
2200        sliding_sync.stop_sync()?;
2201
2202        // The sync loop is actually stopped.
2203        assert!(stream.next().await.is_none());
2204
2205        // Start a new sync loop.
2206        let stream = sliding_sync.sync();
2207        pin_mut!(stream);
2208
2209        // The sync loop is actually running.
2210        assert!(stream.next().await.is_some());
2211
2212        Ok(())
2213    }
2214
2215    #[async_test]
2216    async fn test_sliding_sync_version() -> Result<()> {
2217        let server = MockServer::start().await;
2218        let client = logged_in_client(Some(server.uri())).await;
2219
2220        // By default, sliding sync inherits its version from the client, which is
2221        // `Native`.
2222        {
2223            let sync = client.sliding_sync("default")?.build().await?;
2224
2225            assert_matches!(sync.version(), Version::Native);
2226        }
2227
2228        // Sliding sync can override the configuration from the client.
2229        {
2230            let url = Url::parse("https://bar.matrix/").unwrap();
2231            let sync = client
2232                .sliding_sync("own-proxy")?
2233                .version(Version::Proxy { url: url.clone() })
2234                .build()
2235                .await?;
2236
2237            assert_matches!(
2238                sync.version(),
2239                Version::Proxy { url: given_url } => {
2240                    assert_eq!(&url, given_url);
2241                }
2242            );
2243        }
2244
2245        // Sliding sync inherits from the client…
2246        let url = Url::parse("https://foo.matrix/").unwrap();
2247        client.set_sliding_sync_version(Version::Proxy { url: url.clone() });
2248
2249        {
2250            // The sliding sync inherits the client's sliding sync proxy URL.
2251            let sync = client.sliding_sync("client-proxy")?.build().await?;
2252
2253            assert_matches!(
2254                sync.version(),
2255                Version::Proxy { url: given_url } => {
2256                    assert_eq!(&url, given_url);
2257                }
2258            );
2259        }
2260
2261        {
2262            // …unless we override it afterwards.
2263            let sync = client.sliding_sync("own-proxy")?.version(Version::Native).build().await?;
2264
2265            assert_matches!(sync.version(), Version::Native);
2266        }
2267
2268        Ok(())
2269    }
2270
2271    #[async_test]
2272    async fn test_limited_flag_computation() {
2273        let make_event = |event_id: &str| -> TimelineEvent {
2274            TimelineEvent::new(
2275                Raw::from_json_string(
2276                    json!({
2277                        "event_id": event_id,
2278                        "sender": "@johnmastodon:example.org",
2279                        "origin_server_ts": 1337424242,
2280                        "type": "m.room.message",
2281                        "room_id": "!meaningless:example.org",
2282                        "content": {
2283                            "body": "Hello, world!",
2284                            "msgtype": "m.text"
2285                        },
2286                    })
2287                    .to_string(),
2288                )
2289                .unwrap(),
2290            )
2291        };
2292
2293        let event_a = make_event("$a");
2294        let event_b = make_event("$b");
2295        let event_c = make_event("$c");
2296        let event_d = make_event("$d");
2297
2298        let not_initial = room_id!("!croissant:example.org");
2299        let no_overlap = room_id!("!omelette:example.org");
2300        let partial_overlap = room_id!("!fromage:example.org");
2301        let complete_overlap = room_id!("!baguette:example.org");
2302        let no_remote_events = room_id!("!pain:example.org");
2303        let no_local_events = room_id!("!crepe:example.org");
2304        let already_limited = room_id!("!paris:example.org");
2305
2306        let response_timeline = vec![event_c.raw().clone(), event_d.raw().clone()];
2307
2308        let local_rooms = BTreeMap::from_iter([
2309            (
2310                // This has no events overlapping with the response timeline, hence limited, but
2311                // it's not marked as initial in the response.
2312                not_initial.to_owned(),
2313                SlidingSyncRoom::new(
2314                    no_overlap.to_owned(),
2315                    None,
2316                    vec![event_a.clone(), event_b.clone()],
2317                ),
2318            ),
2319            (
2320                // This has no events overlapping with the response timeline, hence limited.
2321                no_overlap.to_owned(),
2322                SlidingSyncRoom::new(
2323                    no_overlap.to_owned(),
2324                    None,
2325                    vec![event_a.clone(), event_b.clone()],
2326                ),
2327            ),
2328            (
2329                // This has event_c in common with the response timeline.
2330                partial_overlap.to_owned(),
2331                SlidingSyncRoom::new(
2332                    partial_overlap.to_owned(),
2333                    None,
2334                    vec![event_a.clone(), event_b.clone(), event_c.clone()],
2335                ),
2336            ),
2337            (
2338                // This has all events in common with the response timeline.
2339                complete_overlap.to_owned(),
2340                SlidingSyncRoom::new(
2341                    partial_overlap.to_owned(),
2342                    None,
2343                    vec![event_c.clone(), event_d.clone()],
2344                ),
2345            ),
2346            (
2347                // We locally have events for this room, and receive none in the response: not
2348                // limited.
2349                no_remote_events.to_owned(),
2350                SlidingSyncRoom::new(
2351                    no_remote_events.to_owned(),
2352                    None,
2353                    vec![event_c.clone(), event_d.clone()],
2354                ),
2355            ),
2356            (
2357                // We don't have events for this room locally, and even if the remote room contains
2358                // some events, it's not a limited sync.
2359                no_local_events.to_owned(),
2360                SlidingSyncRoom::new(no_local_events.to_owned(), None, vec![]),
2361            ),
2362            (
2363                // Already limited, but would be marked limited if the flag wasn't ignored (same as
2364                // partial overlap).
2365                already_limited.to_owned(),
2366                SlidingSyncRoom::new(
2367                    already_limited.to_owned(),
2368                    None,
2369                    vec![event_a, event_b, event_c.clone()],
2370                ),
2371            ),
2372        ]);
2373
2374        let mut remote_rooms = BTreeMap::from_iter([
2375            (
2376                not_initial.to_owned(),
2377                assign!(http::response::Room::default(), { timeline: response_timeline }),
2378            ),
2379            (
2380                no_overlap.to_owned(),
2381                assign!(http::response::Room::default(), {
2382                    initial: Some(true),
2383                    timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2384                }),
2385            ),
2386            (
2387                partial_overlap.to_owned(),
2388                assign!(http::response::Room::default(), {
2389                    initial: Some(true),
2390                    timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2391                }),
2392            ),
2393            (
2394                complete_overlap.to_owned(),
2395                assign!(http::response::Room::default(), {
2396                    initial: Some(true),
2397                    timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2398                }),
2399            ),
2400            (
2401                no_remote_events.to_owned(),
2402                assign!(http::response::Room::default(), {
2403                    initial: Some(true),
2404                    timeline: vec![],
2405                }),
2406            ),
2407            (
2408                no_local_events.to_owned(),
2409                assign!(http::response::Room::default(), {
2410                    initial: Some(true),
2411                    timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
2412                }),
2413            ),
2414            (
2415                already_limited.to_owned(),
2416                assign!(http::response::Room::default(), {
2417                    initial: Some(true),
2418                    limited: true,
2419                    timeline: vec![event_c.into_raw(), event_d.into_raw()],
2420                }),
2421            ),
2422        ]);
2423
2424        compute_limited(&local_rooms, &mut remote_rooms);
2425
2426        assert!(!remote_rooms.get(not_initial).unwrap().limited);
2427        assert!(remote_rooms.get(no_overlap).unwrap().limited);
2428        assert!(!remote_rooms.get(partial_overlap).unwrap().limited);
2429        assert!(!remote_rooms.get(complete_overlap).unwrap().limited);
2430        assert!(!remote_rooms.get(no_remote_events).unwrap().limited);
2431        assert!(!remote_rooms.get(no_local_events).unwrap().limited);
2432        assert!(remote_rooms.get(already_limited).unwrap().limited);
2433    }
2434
2435    #[async_test]
2436    async fn test_process_read_receipts() -> Result<()> {
2437        let room = owned_room_id!("!pony:example.org");
2438
2439        let server = MockServer::start().await;
2440        let client = logged_in_client(Some(server.uri())).await;
2441
2442        let sliding_sync = client
2443            .sliding_sync("test")?
2444            .with_receipt_extension(
2445                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2446            )
2447            .add_list(
2448                SlidingSyncList::builder("all")
2449                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2450            )
2451            .build()
2452            .await?;
2453
2454        // Initial state.
2455        {
2456            let server_response = assign!(http::Response::new("0".to_owned()), {
2457                rooms: BTreeMap::from([(
2458                    room.clone(),
2459                    http::response::Room::default(),
2460                )])
2461            });
2462
2463            let _summary = {
2464                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2465                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2466            };
2467        }
2468
2469        let server_response = assign!(http::Response::new("1".to_owned()), {
2470            extensions: assign!(http::response::Extensions::default(), {
2471                receipts: assign!(http::response::Receipts::default(), {
2472                    rooms: BTreeMap::from([
2473                        (
2474                            room.clone(),
2475                            Raw::from_json_string(
2476                                json!({
2477                                    "room_id": room,
2478                                    "type": "m.receipt",
2479                                    "content": {
2480                                        "$event:bar.org": {
2481                                            "m.read": {
2482                                                client.user_id().unwrap(): {
2483                                                    "ts": 1436451550,
2484                                                }
2485                                            }
2486                                        }
2487                                    }
2488                                })
2489                                .to_string(),
2490                            ).unwrap()
2491                        )
2492                    ])
2493                })
2494            })
2495        });
2496
2497        let summary = {
2498            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2499            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2500        };
2501
2502        assert!(summary.rooms.contains(&room));
2503
2504        Ok(())
2505    }
2506
2507    #[async_test]
2508    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2509        let room_id = owned_room_id!("!unicorn:example.org");
2510
2511        let server = MockServer::start().await;
2512        let client = logged_in_client(Some(server.uri())).await;
2513
2514        // Setup sliding sync with with one room and one list
2515
2516        let sliding_sync = client
2517            .sliding_sync("test")?
2518            .with_account_data_extension(
2519                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2520            )
2521            .add_list(
2522                SlidingSyncList::builder("all")
2523                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2524            )
2525            .build()
2526            .await?;
2527
2528        // Initial state.
2529        {
2530            let server_response = assign!(http::Response::new("0".to_owned()), {
2531                rooms: BTreeMap::from([(
2532                    room_id.clone(),
2533                    http::response::Room::default(),
2534                )])
2535            });
2536
2537            let _summary = {
2538                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2539                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2540            };
2541        }
2542
2543        // Simulate a response that only changes the marked unread state of the room to
2544        // true
2545
2546        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2547
2548        let update_summary = {
2549            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2550            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2551        };
2552
2553        // Check that the list list and entry received the update
2554
2555        assert!(update_summary.rooms.contains(&room_id));
2556
2557        let room = client.get_room(&room_id).unwrap();
2558
2559        // Check the actual room data, this powers RoomInfo
2560
2561        assert!(room.is_marked_unread());
2562
2563        // Change it back to false and check if it updates
2564
2565        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2566
2567        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2568        sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
2569
2570        let room = client.get_room(&room_id).unwrap();
2571
2572        assert!(!room.is_marked_unread());
2573
2574        Ok(())
2575    }
2576
2577    fn make_mark_unread_response(
2578        response_number: &str,
2579        room_id: OwnedRoomId,
2580        unread: bool,
2581        add_rooms_section: bool,
2582    ) -> http::Response {
2583        let rooms = if add_rooms_section {
2584            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2585        } else {
2586            BTreeMap::new()
2587        };
2588
2589        let extensions = assign!(http::response::Extensions::default(), {
2590            account_data: assign!(http::response::AccountData::default(), {
2591                rooms: BTreeMap::from([
2592                    (
2593                        room_id,
2594                        vec![
2595                            Raw::from_json_string(
2596                                json!({
2597                                    "content": {
2598                                        "unread": unread
2599                                    },
2600                                    "type": "com.famedly.marked_unread"
2601                                })
2602                                .to_string(),
2603                            ).unwrap()
2604                        ]
2605                    )
2606                ])
2607            })
2608        });
2609
2610        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2611    }
2612
2613    #[async_test]
2614    async fn test_process_rooms_account_data() -> Result<()> {
2615        let room = owned_room_id!("!pony:example.org");
2616
2617        let server = MockServer::start().await;
2618        let client = logged_in_client(Some(server.uri())).await;
2619
2620        let sliding_sync = client
2621            .sliding_sync("test")?
2622            .with_account_data_extension(
2623                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2624            )
2625            .add_list(
2626                SlidingSyncList::builder("all")
2627                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2628            )
2629            .build()
2630            .await?;
2631
2632        // Initial state.
2633        {
2634            let server_response = assign!(http::Response::new("0".to_owned()), {
2635                rooms: BTreeMap::from([(
2636                    room.clone(),
2637                    http::response::Room::default(),
2638                )])
2639            });
2640
2641            let _summary = {
2642                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2643                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2644            };
2645        }
2646
2647        let server_response = assign!(http::Response::new("1".to_owned()), {
2648            extensions: assign!(http::response::Extensions::default(), {
2649                account_data: assign!(http::response::AccountData::default(), {
2650                    rooms: BTreeMap::from([
2651                        (
2652                            room.clone(),
2653                            vec![
2654                                Raw::from_json_string(
2655                                    json!({
2656                                        "content": {
2657                                            "tags": {
2658                                                "u.work": {
2659                                                    "order": 0.9
2660                                                }
2661                                            }
2662                                        },
2663                                        "type": "m.tag"
2664                                    })
2665                                    .to_string(),
2666                                ).unwrap()
2667                            ]
2668                        )
2669                    ])
2670                })
2671            })
2672        });
2673        let summary = {
2674            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2675            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2676        };
2677
2678        assert!(summary.rooms.contains(&room));
2679
2680        Ok(())
2681    }
2682
2683    #[async_test]
2684    #[cfg(feature = "e2e-encryption")]
2685    async fn test_process_only_encryption_events() -> Result<()> {
2686        use ruma::OneTimeKeyAlgorithm;
2687
2688        let room = owned_room_id!("!croissant:example.org");
2689
2690        let server = MockServer::start().await;
2691        let client = logged_in_client(Some(server.uri())).await;
2692
2693        let server_response = assign!(http::Response::new("0".to_owned()), {
2694            rooms: BTreeMap::from([(
2695                room.clone(),
2696                assign!(http::response::Room::default(), {
2697                    name: Some("Croissants lovers".to_owned()),
2698                    timeline: Vec::new(),
2699                }),
2700            )]),
2701
2702            extensions: assign!(http::response::Extensions::default(), {
2703                e2ee: assign!(http::response::E2EE::default(), {
2704                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2705                }),
2706                to_device: Some(assign!(http::response::ToDevice::default(), {
2707                    next_batch: "to-device-token".to_owned(),
2708                })),
2709            })
2710        });
2711
2712        // Don't process non-encryption events if the sliding sync is configured for
2713        // encryption only.
2714
2715        let sliding_sync = client
2716            .sliding_sync("test")?
2717            .with_to_device_extension(
2718                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2719            )
2720            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2721            .build()
2722            .await?;
2723
2724        {
2725            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2726
2727            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2728        }
2729
2730        // E2EE has been properly handled.
2731        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2732        assert_eq!(uploaded_key_count, 42);
2733
2734        {
2735            let olm_machine = &*client.olm_machine_for_testing().await;
2736            assert_eq!(
2737                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2738                Some("to-device-token")
2739            );
2740        }
2741
2742        // Room events haven't.
2743        assert!(client.get_room(&room).is_none());
2744
2745        // Conversely, only process room lists events if the sliding sync was configured
2746        // as so.
2747        let client = logged_in_client(Some(server.uri())).await;
2748
2749        let sliding_sync = client
2750            .sliding_sync("test")?
2751            .add_list(SlidingSyncList::builder("thelist"))
2752            .build()
2753            .await?;
2754
2755        {
2756            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2757
2758            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2759        }
2760
2761        // E2EE response has been ignored.
2762        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2763        assert_eq!(uploaded_key_count, 0);
2764
2765        {
2766            let olm_machine = &*client.olm_machine_for_testing().await;
2767            assert_eq!(
2768                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2769                None
2770            );
2771        }
2772
2773        // The room is now known.
2774        assert!(client.get_room(&room).is_some());
2775
2776        // And it's also possible to set up both.
2777        let client = logged_in_client(Some(server.uri())).await;
2778
2779        let sliding_sync = client
2780            .sliding_sync("test")?
2781            .add_list(SlidingSyncList::builder("thelist"))
2782            .with_to_device_extension(
2783                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2784            )
2785            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2786            .build()
2787            .await?;
2788
2789        {
2790            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2791
2792            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2793        }
2794
2795        // E2EE has been properly handled.
2796        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2797        assert_eq!(uploaded_key_count, 42);
2798
2799        {
2800            let olm_machine = &*client.olm_machine_for_testing().await;
2801            assert_eq!(
2802                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2803                Some("to-device-token")
2804            );
2805        }
2806
2807        // The room is now known.
2808        assert!(client.get_room(&room).is_some());
2809
2810        Ok(())
2811    }
2812
2813    #[async_test]
2814    async fn test_lock_multiple_requests() -> Result<()> {
2815        let server = MockServer::start().await;
2816        let client = logged_in_client(Some(server.uri())).await;
2817
2818        let pos = Arc::new(Mutex::new(0));
2819        let _mock_guard = Mock::given(SlidingSyncMatcher)
2820            .respond_with(move |_: &Request| {
2821                let mut pos = pos.lock().unwrap();
2822                *pos += 1;
2823                ResponseTemplate::new(200).set_body_json(json!({
2824                    "pos": pos.to_string(),
2825                    "lists": {},
2826                    "rooms": {}
2827                }))
2828            })
2829            .mount_as_scoped(&server)
2830            .await;
2831
2832        let sliding_sync = client
2833            .sliding_sync("test")?
2834            .with_to_device_extension(
2835                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2836            )
2837            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2838            .build()
2839            .await?;
2840
2841        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2842        // test would never terminate.
2843        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2844
2845        for result in requests.await {
2846            result?;
2847        }
2848
2849        Ok(())
2850    }
2851
2852    #[async_test]
2853    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2854        let server = MockServer::start().await;
2855        let client = logged_in_client(Some(server.uri())).await;
2856
2857        let pos = Arc::new(Mutex::new(0));
2858        let _mock_guard = Mock::given(SlidingSyncMatcher)
2859            .respond_with(move |_: &Request| {
2860                let mut pos = pos.lock().unwrap();
2861                *pos += 1;
2862                // Respond slowly enough that we can skip one iteration.
2863                ResponseTemplate::new(200)
2864                    .set_body_json(json!({
2865                        "pos": pos.to_string(),
2866                        "lists": {},
2867                        "rooms": {}
2868                    }))
2869                    .set_delay(Duration::from_secs(2))
2870            })
2871            .mount_as_scoped(&server)
2872            .await;
2873
2874        let sliding_sync =
2875            client
2876                .sliding_sync("test")?
2877                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2878                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2879                ))
2880                .add_list(
2881                    SlidingSyncList::builder("another-list")
2882                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2883                )
2884                .build()
2885                .await?;
2886
2887        let stream = sliding_sync.sync();
2888        pin_mut!(stream);
2889
2890        let cloned_sync = sliding_sync.clone();
2891        tokio::spawn(async move {
2892            tokio::time::sleep(Duration::from_millis(100)).await;
2893
2894            cloned_sync
2895                .on_list("another-list", |list| {
2896                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2897                    ready(())
2898                })
2899                .await;
2900        });
2901
2902        assert_matches!(stream.next().await, Some(Ok(_)));
2903
2904        sliding_sync.stop_sync().unwrap();
2905
2906        assert_matches!(stream.next().await, None);
2907
2908        let mut num_requests = 0;
2909
2910        for request in server.received_requests().await.unwrap() {
2911            if !SlidingSyncMatcher.matches(&request) {
2912                continue;
2913            }
2914
2915            let another_list_ranges = if num_requests == 0 {
2916                // First request
2917                json!([[0, 10]])
2918            } else {
2919                // Second request
2920                json!([[10, 20]])
2921            };
2922
2923            num_requests += 1;
2924            assert!(num_requests <= 2, "more than one request hit the server");
2925
2926            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2927
2928            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2929                &json_value,
2930                &json!({
2931                    "conn_id": "test",
2932                    "lists": {
2933                        "room-list": {
2934                            "ranges": [[0, 9]],
2935                            "required_state": [
2936                                ["m.room.encryption", ""],
2937                                ["m.room.tombstone", ""]
2938                            ],
2939                        },
2940                        "another-list": {
2941                            "ranges": another_list_ranges,
2942                            "required_state": [
2943                                ["m.room.encryption", ""],
2944                                ["m.room.tombstone", ""]
2945                            ],
2946                        },
2947                    }
2948                }),
2949                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2950            ) {
2951                dbg!(json_value);
2952                panic!("json differ: {err}");
2953            }
2954        }
2955
2956        assert_eq!(num_requests, 2);
2957
2958        Ok(())
2959    }
2960
2961    #[async_test]
2962    async fn test_timeout_zero_list() -> Result<()> {
2963        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2964
2965        let (request, _, _) =
2966            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2967
2968        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2969        // on new update to pop.
2970        assert!(request.timeout.is_some());
2971
2972        Ok(())
2973    }
2974
2975    #[async_test]
2976    async fn test_timeout_one_list() -> Result<()> {
2977        let (_server, sliding_sync) = new_sliding_sync(vec![
2978            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2979        ])
2980        .await?;
2981
2982        let (request, _, _) =
2983            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2984
2985        // The list does not require a timeout.
2986        assert!(request.timeout.is_none());
2987
2988        // Simulate a response.
2989        {
2990            let server_response = assign!(http::Response::new("0".to_owned()), {
2991                lists: BTreeMap::from([(
2992                    "foo".to_owned(),
2993                    assign!(http::response::List::default(), {
2994                        count: uint!(7),
2995                    })
2996                 )])
2997            });
2998
2999            let _summary = {
3000                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3001                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3002            };
3003        }
3004
3005        let (request, _, _) =
3006            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3007
3008        // The list is now fully loaded, so it requires a timeout.
3009        assert!(request.timeout.is_some());
3010
3011        Ok(())
3012    }
3013
3014    #[async_test]
3015    async fn test_timeout_three_lists() -> Result<()> {
3016        let (_server, sliding_sync) = new_sliding_sync(vec![
3017            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
3018            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
3019            SlidingSyncList::builder("baz")
3020                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
3021        ])
3022        .await?;
3023
3024        let (request, _, _) =
3025            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3026
3027        // Two lists don't require a timeout.
3028        assert!(request.timeout.is_none());
3029
3030        // Simulate a response.
3031        {
3032            let server_response = assign!(http::Response::new("0".to_owned()), {
3033                lists: BTreeMap::from([(
3034                    "foo".to_owned(),
3035                    assign!(http::response::List::default(), {
3036                        count: uint!(7),
3037                    })
3038                 )])
3039            });
3040
3041            let _summary = {
3042                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3043                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3044            };
3045        }
3046
3047        let (request, _, _) =
3048            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3049
3050        // One don't require a timeout.
3051        assert!(request.timeout.is_none());
3052
3053        // Simulate a response.
3054        {
3055            let server_response = assign!(http::Response::new("1".to_owned()), {
3056                lists: BTreeMap::from([(
3057                    "bar".to_owned(),
3058                    assign!(http::response::List::default(), {
3059                        count: uint!(7),
3060                    })
3061                 )])
3062            });
3063
3064            let _summary = {
3065                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
3066                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
3067            };
3068        }
3069
3070        let (request, _, _) =
3071            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
3072
3073        // All lists require a timeout.
3074        assert!(request.timeout.is_some());
3075
3076        Ok(())
3077    }
3078
3079    #[async_test]
3080    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
3081        let server = MockServer::start().await;
3082        let client = logged_in_client(Some(server.uri())).await;
3083
3084        let _mock_guard = Mock::given(SlidingSyncMatcher)
3085            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
3086                "pos": "0",
3087                "lists": {},
3088                "rooms": {}
3089            })))
3090            .mount_as_scoped(&server)
3091            .await;
3092
3093        let sliding_sync = client
3094            .sliding_sync("test")?
3095            .with_to_device_extension(
3096                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
3097            )
3098            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
3099            .build()
3100            .await?;
3101
3102        let sliding_sync = Arc::new(sliding_sync);
3103
3104        // Create the listener and perform a sync request
3105        let sync_beat_listener = client.inner.sync_beat.listen();
3106        sliding_sync.sync_once().await?;
3107
3108        // The sync beat listener should be notified shortly after
3109        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
3110        Ok(())
3111    }
3112
3113    #[async_test]
3114    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
3115        let server = MockServer::start().await;
3116        let client = logged_in_client(Some(server.uri())).await;
3117
3118        let _mock_guard = Mock::given(SlidingSyncMatcher)
3119            .respond_with(ResponseTemplate::new(404))
3120            .mount_as_scoped(&server)
3121            .await;
3122
3123        let sliding_sync = client
3124            .sliding_sync("test")?
3125            .with_to_device_extension(
3126                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
3127            )
3128            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
3129            .build()
3130            .await?;
3131
3132        let sliding_sync = Arc::new(sliding_sync);
3133
3134        // Create the listener and perform a sync request
3135        let sync_beat_listener = client.inner.sync_beat.listen();
3136        let sync_result = sliding_sync.sync_once().await;
3137        assert!(sync_result.is_err());
3138
3139        // The sync beat listener won't be notified in this case
3140        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
3141
3142        Ok(())
3143    }
3144}