matrix_sdk_ui/
encryption_sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Encryption Sync API.
16//!
17//! The encryption sync API is a high-level helper that is designed to take care
18//! of handling the synchronization of encryption and to-device events (required
19//! for encryption), be they received within the app or within a dedicated
20//! extension process (e.g. the [NSE] process on iOS devices).
21//!
22//! Under the hood, this uses a sliding sync instance configured with no lists,
23//! but that enables the e2ee and to-device extensions, so that it can both
24//! handle encryption and manage encryption keys; that's sufficient to decrypt
25//! messages received in the notification processes.
26//!
27//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
28
29use std::{pin::Pin, time::Duration};
30
31use async_stream::stream;
32use futures_core::stream::Stream;
33use futures_util::{pin_mut, StreamExt};
34use matrix_sdk::{sleep::sleep, Client, SlidingSync, LEASE_DURATION_MS};
35use matrix_sdk_base::sliding_sync::http;
36use ruma::assign;
37use tokio::sync::OwnedMutexGuard;
38use tracing::{debug, instrument, trace, Span};
39
40/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
41///
42/// This must be created once in the whole application's lifetime, wrapped in a
43/// mutex. Using an `EncryptionSyncService` must then lock that mutex in an
44/// owned way, so that there's at most a single `EncryptionSyncService` running
45/// at any time in the entire app.
46pub struct EncryptionSyncPermit(());
47
48impl EncryptionSyncPermit {
49    pub(crate) fn new() -> Self {
50        Self(())
51    }
52}
53
54impl EncryptionSyncPermit {
55    /// Test-only.
56    #[doc(hidden)]
57    pub fn new_for_testing() -> Self {
58        Self::new()
59    }
60}
61
62/// Should the `EncryptionSyncService` make use of locking?
63pub enum WithLocking {
64    Yes,
65    No,
66}
67
68impl From<bool> for WithLocking {
69    fn from(value: bool) -> Self {
70        if value {
71            Self::Yes
72        } else {
73            Self::No
74        }
75    }
76}
77
78/// High-level helper for synchronizing encryption events using sliding sync.
79///
80/// See the module's documentation for more details.
81pub struct EncryptionSyncService {
82    client: Client,
83    sliding_sync: SlidingSync,
84    with_locking: bool,
85}
86
87impl EncryptionSyncService {
88    /// Creates a new instance of a `EncryptionSyncService`.
89    ///
90    /// This will create and manage an instance of [`matrix_sdk::SlidingSync`].
91    pub async fn new(
92        client: Client,
93        poll_and_network_timeouts: Option<(Duration, Duration)>,
94        with_locking: WithLocking,
95    ) -> Result<Self, Error> {
96        // Make sure to use the same `conn_id` and caching store identifier, whichever
97        // process is running this sliding sync. There must be at most one
98        // sliding sync instance that enables the e2ee and to-device extensions.
99        let mut builder = client
100            .sliding_sync("encryption")
101            .map_err(Error::SlidingSync)?
102            //.share_pos() // TODO(bnjbvr) This is racy, needs cross-process lock :')
103            .with_to_device_extension(
104                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
105            )
106            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}));
107
108        if let Some((poll_timeout, network_timeout)) = poll_and_network_timeouts {
109            builder = builder.poll_timeout(poll_timeout).network_timeout(network_timeout);
110        }
111
112        let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
113
114        let with_locking = matches!(with_locking, WithLocking::Yes);
115
116        if with_locking {
117            // Gently try to enable the cross-process lock on behalf of the user.
118            match client
119                .encryption()
120                .enable_cross_process_store_lock(
121                    client.cross_process_store_locks_holder_name().to_owned(),
122                )
123                .await
124            {
125                Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
126                    // Ignore; we've already set the crypto store lock to
127                    // something, and that's sufficient as
128                    // long as it uniquely identifies the process.
129                }
130                Err(err) => {
131                    // Any other error is fatal
132                    return Err(Error::ClientError(err));
133                }
134            };
135        }
136
137        Ok(Self { client, sliding_sync, with_locking })
138    }
139
140    /// Runs an `EncryptionSyncService` loop for a fixed number of iterations.
141    ///
142    /// This runs for the given number of iterations, or less than that, if it
143    /// stops earlier or could not acquire a cross-process lock (if configured
144    /// with it).
145    ///
146    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
147    /// most one encryption sync running at any time. See its documentation
148    /// for more details.
149    #[instrument(skip_all, fields(store_generation))]
150    pub async fn run_fixed_iterations(
151        self,
152        num_iterations: u8,
153        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
154    ) -> Result<(), Error> {
155        let sync = self.sliding_sync.sync();
156
157        pin_mut!(sync);
158
159        let lock_guard = if self.with_locking {
160            let mut lock_guard =
161                self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
162
163            // Try to take the lock at the beginning; if it's busy, that means that another
164            // process already holds onto it, and as such we won't try to run the
165            // encryption sync loop at all (because we expect the other process to
166            // do so).
167
168            if lock_guard.is_none() {
169                // If we can't acquire the cross-process lock on the first attempt,
170                // that means the main process is running, or its lease hasn't expired
171                // yet. In case it's the latter, wait a bit and retry.
172                tracing::debug!(
173                    "Lock was already taken, and we're not the main loop; retrying in {}ms...",
174                    LEASE_DURATION_MS
175                );
176
177                sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
178
179                lock_guard = self
180                    .client
181                    .encryption()
182                    .try_lock_store_once()
183                    .await
184                    .map_err(Error::LockError)?;
185
186                if lock_guard.is_none() {
187                    tracing::debug!(
188                        "Second attempt at locking outside the main app failed, aborting."
189                    );
190                    return Ok(());
191                }
192            }
193
194            lock_guard
195        } else {
196            None
197        };
198
199        Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
200
201        for _ in 0..num_iterations {
202            match sync.next().await {
203                Some(Ok(update_summary)) => {
204                    // This API is only concerned with the e2ee and to-device extensions.
205                    // Warn if anything weird has been received from the proxy.
206                    if !update_summary.lists.is_empty() {
207                        debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
208                    }
209                    if !update_summary.rooms.is_empty() {
210                        debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
211                    }
212
213                    // Cool cool, let's do it again.
214                    trace!("Encryption sync received an update!");
215                }
216
217                Some(Err(err)) => {
218                    trace!("Encryption sync stopped because of an error: {err:#}");
219                    return Err(Error::SlidingSync(err));
220                }
221
222                None => {
223                    trace!("Encryption sync properly terminated.");
224                    break;
225                }
226            }
227        }
228
229        Ok(())
230    }
231
232    /// Start synchronization.
233    ///
234    /// This should be regularly polled.
235    ///
236    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
237    /// most one encryption sync running at any time. See its documentation
238    /// for more details.
239    #[doc(hidden)] // Only public for testing purposes.
240    pub fn sync(
241        &self,
242        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
243    ) -> impl Stream<Item = Result<(), Error>> + '_ {
244        stream!({
245            let sync = self.sliding_sync.sync();
246
247            pin_mut!(sync);
248
249            loop {
250                match self.next_sync_with_lock(&mut sync).await? {
251                    Some(Ok(update_summary)) => {
252                        // This API is only concerned with the e2ee and to-device extensions.
253                        // Warn if anything weird has been received from the proxy.
254                        if !update_summary.lists.is_empty() {
255                            debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
256                        }
257                        if !update_summary.rooms.is_empty() {
258                            debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
259                        }
260
261                        // Cool cool, let's do it again.
262                        trace!("Encryption sync received an update!");
263                        yield Ok(());
264                        continue;
265                    }
266
267                    Some(Err(err)) => {
268                        trace!("Encryption sync stopped because of an error: {err:#}");
269                        yield Err(Error::SlidingSync(err));
270                        break;
271                    }
272
273                    None => {
274                        trace!("Encryption sync properly terminated.");
275                        break;
276                    }
277                }
278            }
279        })
280    }
281
282    /// Helper function for `sync`. Take the cross-process store lock, and call
283    /// `sync.next()`
284    #[instrument(skip_all, fields(store_generation))]
285    async fn next_sync_with_lock<Item>(
286        &self,
287        sync: &mut Pin<&mut impl Stream<Item = Item>>,
288    ) -> Result<Option<Item>, Error> {
289        let guard = if self.with_locking {
290            self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
291        } else {
292            None
293        };
294
295        Span::current().record("store_generation", guard.map(|guard| guard.generation()));
296
297        Ok(sync.next().await)
298    }
299
300    /// Requests that the underlying sliding sync be stopped.
301    ///
302    /// This will unlock the cross-process lock, if taken.
303    pub(crate) fn stop_sync(&self) -> Result<(), Error> {
304        // Stopping the sync loop will cause the next `next()` call to return `None`, so
305        // this will also release the cross-process lock automatically.
306        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)?;
307
308        Ok(())
309    }
310
311    pub(crate) async fn expire_sync_session(&self) {
312        self.sliding_sync.expire_session().await;
313    }
314}
315
316/// Errors for the [`EncryptionSyncService`].
317#[derive(Debug, thiserror::Error)]
318pub enum Error {
319    #[error("Something wrong happened in sliding sync: {0:#}")]
320    SlidingSync(matrix_sdk::Error),
321
322    #[error("Locking failed: {0:#}")]
323    LockError(matrix_sdk::Error),
324
325    #[error(transparent)]
326    ClientError(matrix_sdk::Error),
327}