matrix_sdk/sliding_sync/
builder.rs1use std::{
2 collections::BTreeMap,
3 fmt::Debug,
4 sync::{Arc, RwLock as StdRwLock},
5 time::Duration,
6};
7
8use matrix_sdk_base::sliding_sync::http;
9use matrix_sdk_common::timer;
10use ruma::OwnedRoomId;
11use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
12
13use super::{
14 cache::{format_storage_key_prefix, restore_sliding_sync_state},
15 sticky_parameters::SlidingSyncStickyManager,
16 Error, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers,
17 Version,
18};
19use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
20
21#[derive(Debug, Clone)]
26pub struct SlidingSyncBuilder {
27 id: String,
28 storage_key: String,
29 version: Option<Version>,
30 client: Client,
31 lists: Vec<SlidingSyncListBuilder>,
32 extensions: Option<http::request::Extensions>,
33 subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
34 poll_timeout: Duration,
35 network_timeout: Duration,
36 #[cfg(feature = "e2e-encryption")]
37 share_pos: bool,
38}
39
40impl SlidingSyncBuilder {
41 pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
42 if id.len() > 16 {
43 Err(Error::InvalidSlidingSyncIdentifier)
44 } else {
45 let storage_key =
46 format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
47
48 Ok(Self {
49 id,
50 storage_key,
51 version: None,
52 client,
53 lists: Vec::new(),
54 extensions: None,
55 subscriptions: BTreeMap::new(),
56 poll_timeout: Duration::from_secs(30),
57 network_timeout: Duration::from_secs(30),
58 #[cfg(feature = "e2e-encryption")]
59 share_pos: false,
60 })
61 }
62 }
63
64 pub fn version(mut self, version: Version) -> Self {
66 self.version = Some(version);
67 self
68 }
69
70 pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
74 self.lists.push(list_builder);
75 self
76 }
77
78 pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
86 let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
87
88 list.set_cached_and_reload(&self.client, &self.storage_key).await?;
89
90 Ok(self.add_list(list))
91 }
92
93 pub fn with_all_extensions(mut self) -> Self {
99 {
100 let cfg = self.extensions.get_or_insert_with(Default::default);
101 if cfg.to_device.enabled.is_none() {
102 cfg.to_device.enabled = Some(true);
103 }
104
105 if cfg.e2ee.enabled.is_none() {
106 cfg.e2ee.enabled = Some(true);
107 }
108
109 if cfg.account_data.enabled.is_none() {
110 cfg.account_data.enabled = Some(true);
111 }
112
113 if cfg.receipts.enabled.is_none() {
114 cfg.receipts.enabled = Some(true);
115 }
116
117 if cfg.typing.enabled.is_none() {
118 cfg.typing.enabled = Some(true);
119 }
120 }
121 self
122 }
123
124 pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
126 self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
127 self
128 }
129
130 pub fn without_e2ee_extension(mut self) -> Self {
132 self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
133 self
134 }
135
136 pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
138 self.extensions.get_or_insert_with(Default::default).to_device = to_device;
139 self
140 }
141
142 pub fn without_to_device_extension(mut self) -> Self {
144 self.extensions.get_or_insert_with(Default::default).to_device =
145 http::request::ToDevice::default();
146 self
147 }
148
149 pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
151 self.extensions.get_or_insert_with(Default::default).account_data = account_data;
152 self
153 }
154
155 pub fn without_account_data_extension(mut self) -> Self {
157 self.extensions.get_or_insert_with(Default::default).account_data =
158 http::request::AccountData::default();
159 self
160 }
161
162 pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
164 self.extensions.get_or_insert_with(Default::default).typing = typing;
165 self
166 }
167
168 pub fn without_typing_extension(mut self) -> Self {
170 self.extensions.get_or_insert_with(Default::default).typing =
171 http::request::Typing::default();
172 self
173 }
174
175 pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
177 self.extensions.get_or_insert_with(Default::default).receipts = receipt;
178 self
179 }
180
181 pub fn without_receipt_extension(mut self) -> Self {
183 self.extensions.get_or_insert_with(Default::default).receipts =
184 http::request::Receipts::default();
185 self
186 }
187
188 pub fn poll_timeout(mut self, timeout: Duration) -> Self {
198 self.poll_timeout = timeout;
199 self
200 }
201
202 pub fn network_timeout(mut self, timeout: Duration) -> Self {
208 self.network_timeout = timeout;
209 self
210 }
211
212 #[cfg(feature = "e2e-encryption")]
221 pub fn share_pos(mut self) -> Self {
222 self.share_pos = true;
223 self
224 }
225
226 pub async fn build(self) -> Result<SlidingSync> {
231 let client = self.client;
232
233 let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
234
235 if matches!(version, Version::None) {
236 return Err(crate::error::Error::SlidingSync(Error::VersionIsMissing));
237 }
238
239 let (internal_channel_sender, _internal_channel_receiver) = channel(8);
240
241 let mut lists = BTreeMap::new();
242
243 for list_builder in self.lists {
244 let list = list_builder.build(internal_channel_sender.clone());
245
246 lists.insert(list.name().to_owned(), list);
247 }
248
249 let restored_fields =
251 restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
252
253 let (pos, rooms) = if let Some(fields) = restored_fields {
254 #[cfg(feature = "e2e-encryption")]
255 let pos = if self.share_pos { fields.pos } else { None };
256 #[cfg(not(feature = "e2e-encryption"))]
257 let pos = None;
258
259 (pos, fields.rooms)
260 } else {
261 (None, BTreeMap::new())
262 };
263
264 #[cfg(feature = "e2e-encryption")]
265 let share_pos = self.share_pos;
266 #[cfg(not(feature = "e2e-encryption"))]
267 let share_pos = false;
268
269 let rooms = AsyncRwLock::new(rooms);
270 let lists = AsyncRwLock::new(lists);
271
272 Ok(SlidingSync::new(SlidingSyncInner {
273 id: self.id,
274 version,
275
276 client,
277 storage_key: self.storage_key,
278 share_pos,
279
280 lists,
281 rooms,
282
283 position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
284
285 sticky: StdRwLock::new(SlidingSyncStickyManager::new(
286 SlidingSyncStickyParameters::new(
287 self.subscriptions,
288 self.extensions.unwrap_or_default(),
289 ),
290 )),
291
292 internal_channel: internal_channel_sender,
293
294 poll_timeout: self.poll_timeout,
295 network_timeout: self.network_timeout,
296 }))
297 }
298}