1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::cross_process_lock::{CrossProcessLock, CrossProcessLockConfig};
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{Mutex, broadcast};
8use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
9use tracing::{debug, trace, warn};
10
11use super::{
12 DeviceChanges, IdentityChanges, LockableCryptoStore, caches::SessionStore,
13 types::RoomKeyBundleInfo,
14};
15use crate::{
16 CryptoStoreError, OwnUserIdentityData, Session, UserIdentityData,
17 olm::InboundGroupSession,
18 store,
19 store::{
20 Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo,
21 types::SecretsInboxItem,
22 },
23};
24
25#[derive(Debug)]
30pub(crate) struct CryptoStoreWrapper {
31 user_id: OwnedUserId,
32 device_id: OwnedDeviceId,
33
34 store: Arc<DynCryptoStore>,
35
36 sessions: SessionStore,
38
39 room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
42
43 room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
46
47 secrets_broadcaster: broadcast::Sender<SecretsInboxItem>,
50
51 identities_broadcaster:
54 broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
55
56 historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
59}
60
61impl CryptoStoreWrapper {
62 pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
63 let room_keys_received_sender = broadcast::Sender::new(10);
64 let room_keys_withheld_received_sender = broadcast::Sender::new(10);
65 let secrets_broadcaster = broadcast::Sender::new(10);
66 let identities_broadcaster = broadcast::Sender::new(20);
69 let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
70
71 Self {
72 user_id: user_id.to_owned(),
73 device_id: device_id.to_owned(),
74 store: store.into_crypto_store(),
75 sessions: SessionStore::new(),
76 room_keys_received_sender,
77 room_keys_withheld_received_sender,
78 secrets_broadcaster,
79 identities_broadcaster,
80 historic_room_key_bundles_broadcaster,
81 }
82 }
83
84 pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
93 let room_key_updates: Vec<_> =
94 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
95
96 let withheld_session_updates: Vec<_> = changes
97 .withheld_session_info
98 .iter()
99 .flat_map(|(room_id, session_map)| {
100 session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
101 room_id: room_id.to_owned(),
102 session_id: session_id.to_owned(),
103 withheld_event: withheld_event.clone(),
104 })
105 })
106 .collect();
107
108 let own_identity_was_verified_before_change = self
112 .store
113 .get_user_identity(self.user_id.as_ref())
114 .await?
115 .as_ref()
116 .and_then(|i| i.own())
117 .is_some_and(|own| own.is_verified());
118
119 let secrets = changes.secrets.to_owned();
120 let devices = changes.devices.to_owned();
121 let identities = changes.identities.to_owned();
122 let room_key_bundle_updates: Vec<_> =
123 changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
124
125 if devices
126 .changed
127 .iter()
128 .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
129 {
130 self.sessions.clear().await;
134 } else {
135 for session in &changes.sessions {
137 self.sessions.add(session.clone()).await;
138 }
139 }
140
141 self.store.save_changes(changes).await?;
142
143 if tracing::level_enabled!(tracing::Level::DEBUG) {
145 for updated_identity in
146 identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
147 {
148 let master_key = updated_identity.master_key().get_first_key();
149 let user_signing_key = updated_identity.user_signing_key().get_first_key();
150 let self_signing_key = updated_identity.self_signing_key().get_first_key();
151
152 debug!(
153 ?master_key,
154 ?user_signing_key,
155 ?self_signing_key,
156 previously_verified = updated_identity.was_previously_verified(),
157 verified = updated_identity.is_verified(),
158 "Stored our own identity"
159 );
160 }
161 }
162
163 if !room_key_updates.is_empty() {
164 let _ = self.room_keys_received_sender.send(room_key_updates);
166 }
167
168 if !withheld_session_updates.is_empty() {
169 let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
170 }
171
172 for secret in secrets {
173 let _ = self.secrets_broadcaster.send(secret);
174 }
175
176 for bundle_info in room_key_bundle_updates {
177 let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
178 }
179
180 if !devices.is_empty() || !identities.is_empty() {
181 let maybe_own_identity =
185 self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
186
187 if let Some(own_identity_after) = maybe_own_identity.as_ref() {
191 let own_identity_is_verified = own_identity_after.is_verified();
194
195 if !own_identity_was_verified_before_change && own_identity_is_verified {
196 debug!(
197 "Own identity is now verified, check all known identities for verification status changes"
198 );
199 self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
202 own_identity_after,
203 )
204 .await?;
205 } else if own_identity_was_verified_before_change != own_identity_is_verified {
206 debug!(
208 own_identity_is_verified,
209 "The verification state of our own identity has changed",
210 );
211 }
212 }
213
214 let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
215 }
216
217 Ok(())
218 }
219
220 async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
221 &self,
222 own_identity_after: &OwnUserIdentityData,
223 ) -> Result<(), CryptoStoreError> {
224 let tracked_users = self.store.load_tracked_users().await?;
225 let mut updated_identities: Vec<UserIdentityData> = Default::default();
226 for tracked_user in tracked_users {
227 if let Some(other_identity) = self
228 .store
229 .get_user_identity(tracked_user.user_id.as_ref())
230 .await?
231 .as_ref()
232 .and_then(|i| i.other())
233 && !other_identity.was_previously_verified()
234 && own_identity_after.is_identity_signed(other_identity)
235 {
236 trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
237 other_identity.mark_as_previously_verified();
238 updated_identities.push(other_identity.clone().into());
239 }
240 }
241
242 if !updated_identities.is_empty() {
243 let identity_changes =
244 IdentityChanges { changed: updated_identities, ..Default::default() };
245 self.store
246 .save_changes(Changes {
247 identities: identity_changes.clone(),
248 ..Default::default()
249 })
250 .await?;
251
252 let _ = self.identities_broadcaster.send((
253 Some(own_identity_after.clone()),
254 identity_changes,
255 DeviceChanges::default(),
256 ));
257 }
258
259 Ok(())
260 }
261
262 pub async fn get_sessions(
263 &self,
264 sender_key: &str,
265 ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
266 let sessions = self.sessions.get(sender_key).await;
267
268 let sessions = if sessions.is_none() {
269 let mut entries = self.sessions.entries.write().await;
270
271 let sessions = entries.get(sender_key);
272
273 if sessions.is_some() {
274 sessions.cloned()
275 } else {
276 let sessions = self.store.get_sessions(sender_key).await?;
277 let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
278
279 entries.insert(sender_key.to_owned(), sessions.clone());
280
281 Some(sessions)
282 }
283 } else {
284 sessions
285 };
286
287 Ok(sessions)
288 }
289
290 pub async fn save_inbound_group_sessions(
301 &self,
302 sessions: Vec<InboundGroupSession>,
303 backed_up_to_version: Option<&str>,
304 ) -> store::Result<()> {
305 let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
306 self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
307
308 if !room_key_updates.is_empty() {
309 let _ = self.room_keys_received_sender.send(room_key_updates);
311 }
312 Ok(())
313 }
314
315 pub fn room_keys_received_stream(
324 &self,
325 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
326 BroadcastStream::new(self.room_keys_received_sender.subscribe())
327 }
328
329 pub fn room_keys_withheld_received_stream(
338 &self,
339 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
340 let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
341 Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
342 }
343
344 pub fn secrets_stream(&self) -> impl Stream<Item = SecretsInboxItem> + use<> {
347 let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
348 Self::filter_errors_out_of_stream(stream, "secrets_stream")
349 }
350
351 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
354 let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
355 Self::filter_errors_out_of_stream(stream, "bundle_stream")
356 }
357
358 pub(super) fn identities_stream(
363 &self,
364 ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> + use<>
365 {
366 let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
367 Self::filter_errors_out_of_stream(stream, "identities_stream")
368 }
369
370 fn filter_errors_out_of_stream<ItemType>(
377 stream: BroadcastStream<ItemType>,
378 stream_name: &str,
379 ) -> impl Stream<Item = ItemType> + use<ItemType>
380 where
381 ItemType: 'static + Clone + Send,
382 {
383 let stream_name = stream_name.to_owned();
384 stream.filter_map(move |result| {
385 future::ready(match result {
386 Ok(r) => Some(r),
387 Err(BroadcastStreamRecvError::Lagged(lag)) => {
388 warn!("{stream_name} missed {lag} updates");
389 None
390 }
391 })
392 })
393 }
394
395 pub(crate) fn create_store_lock(
398 &self,
399 lock_key: String,
400 config: CrossProcessLockConfig,
401 ) -> CrossProcessLock<LockableCryptoStore> {
402 CrossProcessLock::new(LockableCryptoStore(self.store.clone()), lock_key, config)
403 }
404}
405
406impl Deref for CryptoStoreWrapper {
407 type Target = DynCryptoStore;
408
409 fn deref(&self) -> &Self::Target {
410 self.store.deref()
411 }
412}
413
414#[cfg(test)]
415mod test {
416 use matrix_sdk_test::async_test;
417 use ruma::user_id;
418
419 use super::*;
420 use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
421
422 #[async_test]
423 async fn test_cache_cleared_after_device_update() {
424 let user_id = user_id!("@alice:example.com");
425 let (first, second) =
426 get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
427
428 let sender_key = second.identity_keys().curve25519.to_base64();
429
430 first
431 .store()
432 .inner
433 .store
434 .sessions
435 .get(&sender_key)
436 .await
437 .expect("We should have a session in the cache.");
438
439 let device_data = first
440 .get_device(user_id, first.device_id(), None)
441 .await
442 .unwrap()
443 .expect("We should have access to our own device.")
444 .inner;
445
446 first
448 .store()
449 .save_changes(Changes {
450 devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
451 ..Default::default()
452 })
453 .await
454 .unwrap();
455
456 assert!(
458 first.store().inner.store.sessions.get(&sender_key).await.is_none(),
459 "The session should no longer be in the cache after our own device keys changed"
460 );
461 }
462}