matrix_sdk/sliding_sync/
builder.rs

1use std::{
2    collections::BTreeMap,
3    fmt::Debug,
4    sync::{Arc, RwLock as StdRwLock},
5    time::Duration,
6};
7
8use matrix_sdk_base::sliding_sync::http;
9use matrix_sdk_common::timer;
10use ruma::OwnedRoomId;
11use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
12
13use super::{
14    cache::{format_storage_key_prefix, restore_sliding_sync_state},
15    sticky_parameters::SlidingSyncStickyManager,
16    Error, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers,
17    Version,
18};
19use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
20
21/// Configuration for a Sliding Sync instance.
22///
23/// Get a new builder with methods like [`crate::Client::sliding_sync`], or
24/// [`crate::SlidingSync::builder`].
25#[derive(Debug, Clone)]
26pub struct SlidingSyncBuilder {
27    id: String,
28    storage_key: String,
29    version: Option<Version>,
30    client: Client,
31    lists: Vec<SlidingSyncListBuilder>,
32    extensions: Option<http::request::Extensions>,
33    subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
34    poll_timeout: Duration,
35    network_timeout: Duration,
36    #[cfg(feature = "e2e-encryption")]
37    share_pos: bool,
38}
39
40impl SlidingSyncBuilder {
41    pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
42        if id.len() > 16 {
43            Err(Error::InvalidSlidingSyncIdentifier)
44        } else {
45            let storage_key =
46                format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
47
48            Ok(Self {
49                id,
50                storage_key,
51                version: None,
52                client,
53                lists: Vec::new(),
54                extensions: None,
55                subscriptions: BTreeMap::new(),
56                poll_timeout: Duration::from_secs(30),
57                network_timeout: Duration::from_secs(30),
58                #[cfg(feature = "e2e-encryption")]
59                share_pos: false,
60            })
61        }
62    }
63
64    /// Set a specific version that will override the one from the [`Client`].
65    pub fn version(mut self, version: Version) -> Self {
66        self.version = Some(version);
67        self
68    }
69
70    /// Add the given list to the lists.
71    ///
72    /// Replace any list with the same name.
73    pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
74        self.lists.push(list_builder);
75        self
76    }
77
78    /// Enroll the list in caching, reloads it from the cache if possible, and
79    /// adds it to the list of lists.
80    ///
81    /// This will raise an error if there was a I/O error reading from the
82    /// cache.
83    ///
84    /// Replace any list with the same name.
85    pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
86        let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
87
88        list.set_cached_and_reload(&self.client, &self.storage_key).await?;
89
90        Ok(self.add_list(list))
91    }
92
93    /// Activate e2ee, to-device-message, account data, typing and receipt
94    /// extensions if not yet configured.
95    ///
96    /// Will leave any extension configuration found untouched, so the order
97    /// does not matter.
98    pub fn with_all_extensions(mut self) -> Self {
99        {
100            let cfg = self.extensions.get_or_insert_with(Default::default);
101            if cfg.to_device.enabled.is_none() {
102                cfg.to_device.enabled = Some(true);
103            }
104
105            if cfg.e2ee.enabled.is_none() {
106                cfg.e2ee.enabled = Some(true);
107            }
108
109            if cfg.account_data.enabled.is_none() {
110                cfg.account_data.enabled = Some(true);
111            }
112
113            if cfg.receipts.enabled.is_none() {
114                cfg.receipts.enabled = Some(true);
115            }
116
117            if cfg.typing.enabled.is_none() {
118                cfg.typing.enabled = Some(true);
119            }
120        }
121        self
122    }
123
124    /// Set the E2EE extension configuration.
125    pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
126        self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
127        self
128    }
129
130    /// Unset the E2EE extension configuration.
131    pub fn without_e2ee_extension(mut self) -> Self {
132        self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
133        self
134    }
135
136    /// Set the ToDevice extension configuration.
137    pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
138        self.extensions.get_or_insert_with(Default::default).to_device = to_device;
139        self
140    }
141
142    /// Unset the ToDevice extension configuration.
143    pub fn without_to_device_extension(mut self) -> Self {
144        self.extensions.get_or_insert_with(Default::default).to_device =
145            http::request::ToDevice::default();
146        self
147    }
148
149    /// Set the account data extension configuration.
150    pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
151        self.extensions.get_or_insert_with(Default::default).account_data = account_data;
152        self
153    }
154
155    /// Unset the account data extension configuration.
156    pub fn without_account_data_extension(mut self) -> Self {
157        self.extensions.get_or_insert_with(Default::default).account_data =
158            http::request::AccountData::default();
159        self
160    }
161
162    /// Set the Typing extension configuration.
163    pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
164        self.extensions.get_or_insert_with(Default::default).typing = typing;
165        self
166    }
167
168    /// Unset the Typing extension configuration.
169    pub fn without_typing_extension(mut self) -> Self {
170        self.extensions.get_or_insert_with(Default::default).typing =
171            http::request::Typing::default();
172        self
173    }
174
175    /// Set the Receipt extension configuration.
176    pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
177        self.extensions.get_or_insert_with(Default::default).receipts = receipt;
178        self
179    }
180
181    /// Unset the Receipt extension configuration.
182    pub fn without_receipt_extension(mut self) -> Self {
183        self.extensions.get_or_insert_with(Default::default).receipts =
184            http::request::Receipts::default();
185        self
186    }
187
188    /// Sets a custom timeout duration for the sliding sync polling endpoint.
189    ///
190    /// This is the maximum time to wait before the sliding sync server returns
191    /// the long-polling request. If no events (or other data) become
192    /// available before this time elapses, the server will a return a
193    /// response with empty fields.
194    ///
195    /// There's an additional network timeout on top of that that can be
196    /// configured with [`Self::network_timeout`].
197    pub fn poll_timeout(mut self, timeout: Duration) -> Self {
198        self.poll_timeout = timeout;
199        self
200    }
201
202    /// Sets a custom network timeout for the sliding sync polling.
203    ///
204    /// This is not the polling timeout that can be configured with
205    /// [`Self::poll_timeout`], but an additional timeout that will be
206    /// added to the former.
207    pub fn network_timeout(mut self, timeout: Duration) -> Self {
208        self.network_timeout = timeout;
209        self
210    }
211
212    /// Should the sliding sync instance share its sync position through
213    /// storage?
214    ///
215    /// In general, sliding sync instances will cache the sync position (`pos`
216    /// field in the request) in internal memory. It can be useful, in
217    /// multi-process scenarios, to save it into some shared storage so that one
218    /// sliding sync instance running across two different processes can
219    /// continue with the same sync position it had before being stopped.
220    #[cfg(feature = "e2e-encryption")]
221    pub fn share_pos(mut self) -> Self {
222        self.share_pos = true;
223        self
224    }
225
226    /// Build the Sliding Sync.
227    ///
228    /// If `self.storage_key` is `Some(_)`, load the cached data from cold
229    /// storage.
230    pub async fn build(self) -> Result<SlidingSync> {
231        let client = self.client;
232
233        let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
234
235        if matches!(version, Version::None) {
236            return Err(crate::error::Error::SlidingSync(Error::VersionIsMissing));
237        }
238
239        let (internal_channel_sender, _internal_channel_receiver) = channel(8);
240
241        let mut lists = BTreeMap::new();
242
243        for list_builder in self.lists {
244            let list = list_builder.build(internal_channel_sender.clone());
245
246            lists.insert(list.name().to_owned(), list);
247        }
248
249        // Reload existing state from the cache.
250        let restored_fields =
251            restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
252
253        let (pos, rooms) = if let Some(fields) = restored_fields {
254            #[cfg(feature = "e2e-encryption")]
255            let pos = if self.share_pos { fields.pos } else { None };
256            #[cfg(not(feature = "e2e-encryption"))]
257            let pos = None;
258
259            (pos, fields.rooms)
260        } else {
261            (None, BTreeMap::new())
262        };
263
264        #[cfg(feature = "e2e-encryption")]
265        let share_pos = self.share_pos;
266        #[cfg(not(feature = "e2e-encryption"))]
267        let share_pos = false;
268
269        let rooms = AsyncRwLock::new(rooms);
270        let lists = AsyncRwLock::new(lists);
271
272        Ok(SlidingSync::new(SlidingSyncInner {
273            id: self.id,
274            version,
275
276            client,
277            storage_key: self.storage_key,
278            share_pos,
279
280            lists,
281            rooms,
282
283            position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
284
285            sticky: StdRwLock::new(SlidingSyncStickyManager::new(
286                SlidingSyncStickyParameters::new(
287                    self.subscriptions,
288                    self.extensions.unwrap_or_default(),
289                ),
290            )),
291
292            internal_channel: internal_channel_sender,
293
294            poll_timeout: self.poll_timeout,
295            network_timeout: self.network_timeout,
296        }))
297    }
298}