1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::store_locks::CrossProcessStoreLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{broadcast, Mutex};
8use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
9use tracing::{debug, trace, warn};
10
11use super::{
12 caches::SessionStore, types::RoomKeyBundleInfo, DeviceChanges, IdentityChanges,
13 LockableCryptoStore,
14};
15use crate::{
16 olm::InboundGroupSession,
17 store,
18 store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
19 CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
20};
21
22#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28 user_id: OwnedUserId,
29 device_id: OwnedDeviceId,
30
31 store: Arc<DynCryptoStore>,
32
33 sessions: SessionStore,
35
36 room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40 room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44 secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48 identities_broadcaster:
51 broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52
53 historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
56}
57
58impl CryptoStoreWrapper {
59 pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
60 let room_keys_received_sender = broadcast::Sender::new(10);
61 let room_keys_withheld_received_sender = broadcast::Sender::new(10);
62 let secrets_broadcaster = broadcast::Sender::new(10);
63 let identities_broadcaster = broadcast::Sender::new(20);
66 let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
67
68 Self {
69 user_id: user_id.to_owned(),
70 device_id: device_id.to_owned(),
71 store: store.into_crypto_store(),
72 sessions: SessionStore::new(),
73 room_keys_received_sender,
74 room_keys_withheld_received_sender,
75 secrets_broadcaster,
76 identities_broadcaster,
77 historic_room_key_bundles_broadcaster,
78 }
79 }
80
81 pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
90 let room_key_updates: Vec<_> =
91 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
92
93 let withheld_session_updates: Vec<_> = changes
94 .withheld_session_info
95 .iter()
96 .flat_map(|(room_id, session_map)| {
97 session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
98 room_id: room_id.to_owned(),
99 session_id: session_id.to_owned(),
100 withheld_event: withheld_event.clone(),
101 })
102 })
103 .collect();
104
105 let own_identity_was_verified_before_change = self
109 .store
110 .get_user_identity(self.user_id.as_ref())
111 .await?
112 .as_ref()
113 .and_then(|i| i.own())
114 .is_some_and(|own| own.is_verified());
115
116 let secrets = changes.secrets.to_owned();
117 let devices = changes.devices.to_owned();
118 let identities = changes.identities.to_owned();
119 let room_key_bundle_updates: Vec<_> =
120 changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
121
122 if devices
123 .changed
124 .iter()
125 .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
126 {
127 self.sessions.clear().await;
131 } else {
132 for session in &changes.sessions {
134 self.sessions.add(session.clone()).await;
135 }
136 }
137
138 self.store.save_changes(changes).await?;
139
140 if tracing::level_enabled!(tracing::Level::DEBUG) {
142 for updated_identity in
143 identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
144 {
145 let master_key = updated_identity.master_key().get_first_key();
146 let user_signing_key = updated_identity.user_signing_key().get_first_key();
147 let self_signing_key = updated_identity.self_signing_key().get_first_key();
148
149 debug!(
150 ?master_key,
151 ?user_signing_key,
152 ?self_signing_key,
153 previously_verified = updated_identity.was_previously_verified(),
154 verified = updated_identity.is_verified(),
155 "Stored our own identity"
156 );
157 }
158 }
159
160 if !room_key_updates.is_empty() {
161 let _ = self.room_keys_received_sender.send(room_key_updates);
163 }
164
165 if !withheld_session_updates.is_empty() {
166 let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
167 }
168
169 for secret in secrets {
170 let _ = self.secrets_broadcaster.send(secret);
171 }
172
173 for bundle_info in room_key_bundle_updates {
174 let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
175 }
176
177 if !devices.is_empty() || !identities.is_empty() {
178 let maybe_own_identity =
182 self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
183
184 if let Some(own_identity_after) = maybe_own_identity.as_ref() {
188 let own_identity_is_verified = own_identity_after.is_verified();
191
192 if !own_identity_was_verified_before_change && own_identity_is_verified {
193 debug!(
194 "Own identity is now verified, check all known identities for verification status changes"
195 );
196 self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
199 own_identity_after,
200 )
201 .await?;
202 } else if own_identity_was_verified_before_change != own_identity_is_verified {
203 debug!(
205 own_identity_is_verified,
206 "The verification state of our own identity has changed",
207 );
208 }
209 }
210
211 let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
212 }
213
214 Ok(())
215 }
216
217 async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
218 &self,
219 own_identity_after: &OwnUserIdentityData,
220 ) -> Result<(), CryptoStoreError> {
221 let tracked_users = self.store.load_tracked_users().await?;
222 let mut updated_identities: Vec<UserIdentityData> = Default::default();
223 for tracked_user in tracked_users {
224 if let Some(other_identity) = self
225 .store
226 .get_user_identity(tracked_user.user_id.as_ref())
227 .await?
228 .as_ref()
229 .and_then(|i| i.other())
230 {
231 if !other_identity.was_previously_verified()
232 && own_identity_after.is_identity_signed(other_identity)
233 {
234 trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
235 other_identity.mark_as_previously_verified();
236 updated_identities.push(other_identity.clone().into());
237 }
238 }
239 }
240
241 if !updated_identities.is_empty() {
242 let identity_changes =
243 IdentityChanges { changed: updated_identities, ..Default::default() };
244 self.store
245 .save_changes(Changes {
246 identities: identity_changes.clone(),
247 ..Default::default()
248 })
249 .await?;
250
251 let _ = self.identities_broadcaster.send((
252 Some(own_identity_after.clone()),
253 identity_changes,
254 DeviceChanges::default(),
255 ));
256 }
257
258 Ok(())
259 }
260
261 pub async fn get_sessions(
262 &self,
263 sender_key: &str,
264 ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
265 let sessions = self.sessions.get(sender_key).await;
266
267 let sessions = if sessions.is_none() {
268 let mut entries = self.sessions.entries.write().await;
269
270 let sessions = entries.get(sender_key);
271
272 if sessions.is_some() {
273 sessions.cloned()
274 } else {
275 let sessions = self.store.get_sessions(sender_key).await?;
276 let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
277
278 entries.insert(sender_key.to_owned(), sessions.clone());
279
280 Some(sessions)
281 }
282 } else {
283 sessions
284 };
285
286 Ok(sessions)
287 }
288
289 pub async fn save_inbound_group_sessions(
300 &self,
301 sessions: Vec<InboundGroupSession>,
302 backed_up_to_version: Option<&str>,
303 ) -> store::Result<()> {
304 let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
305 self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
306
307 if !room_key_updates.is_empty() {
308 let _ = self.room_keys_received_sender.send(room_key_updates);
310 }
311 Ok(())
312 }
313
314 pub fn room_keys_received_stream(
323 &self,
324 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
325 BroadcastStream::new(self.room_keys_received_sender.subscribe())
326 }
327
328 pub fn room_keys_withheld_received_stream(
337 &self,
338 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
339 let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
340 Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
341 }
342
343 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
346 let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
347 Self::filter_errors_out_of_stream(stream, "secrets_stream")
348 }
349
350 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> {
353 let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
354 Self::filter_errors_out_of_stream(stream, "bundle_stream")
355 }
356
357 pub(super) fn identities_stream(
362 &self,
363 ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> {
364 let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
365 Self::filter_errors_out_of_stream(stream, "identities_stream")
366 }
367
368 fn filter_errors_out_of_stream<ItemType>(
375 stream: BroadcastStream<ItemType>,
376 stream_name: &str,
377 ) -> impl Stream<Item = ItemType>
378 where
379 ItemType: 'static + Clone + Send,
380 {
381 let stream_name = stream_name.to_owned();
382 stream.filter_map(move |result| {
383 future::ready(match result {
384 Ok(r) => Some(r),
385 Err(BroadcastStreamRecvError::Lagged(lag)) => {
386 warn!("{stream_name} missed {lag} updates");
387 None
388 }
389 })
390 })
391 }
392
393 pub(crate) fn create_store_lock(
396 &self,
397 lock_key: String,
398 lock_value: String,
399 ) -> CrossProcessStoreLock<LockableCryptoStore> {
400 CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
401 }
402}
403
404impl Deref for CryptoStoreWrapper {
405 type Target = DynCryptoStore;
406
407 fn deref(&self) -> &Self::Target {
408 self.store.deref()
409 }
410}
411
412#[cfg(test)]
413mod test {
414 use matrix_sdk_test::async_test;
415 use ruma::user_id;
416
417 use super::*;
418 use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
419
420 #[async_test]
421 async fn test_cache_cleared_after_device_update() {
422 let user_id = user_id!("@alice:example.com");
423 let (first, second) =
424 get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
425
426 let sender_key = second.identity_keys().curve25519.to_base64();
427
428 first
429 .store()
430 .inner
431 .store
432 .sessions
433 .get(&sender_key)
434 .await
435 .expect("We should have a session in the cache.");
436
437 let device_data = first
438 .get_device(user_id, first.device_id(), None)
439 .await
440 .unwrap()
441 .expect("We should have access to our own device.")
442 .inner;
443
444 first
446 .store()
447 .save_changes(Changes {
448 devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
449 ..Default::default()
450 })
451 .await
452 .unwrap();
453
454 assert!(
456 first.store().inner.store.sessions.get(&sender_key).await.is_none(),
457 "The session should no longer be in the cache after our own device keys changed"
458 );
459 }
460}