matrix_sdk_ui/timeline/controller/
decryption_retry_task.rs1use std::{collections::BTreeSet, sync::Arc};
16
17use futures_core::Stream;
18use futures_util::pin_mut;
19use imbl::Vector;
20use itertools::{Either, Itertools as _};
21use matrix_sdk::{
22 Room,
23 encryption::backups::BackupState,
24 event_cache::RedecryptorReport,
25 executor::{JoinHandle, spawn},
26};
27use tokio_stream::{StreamExt as _, wrappers::errors::BroadcastStreamRecvError};
28
29use crate::timeline::{TimelineController, TimelineItem};
30
31#[derive(Debug)]
34pub(in crate::timeline) struct CryptoDropHandles {
35 redecryption_report_join_handle: JoinHandle<()>,
36 room_key_backup_enabled_join_handle: JoinHandle<()>,
37 encryption_changes_handle: JoinHandle<()>,
38}
39
40impl Drop for CryptoDropHandles {
41 fn drop(&mut self) {
42 self.redecryption_report_join_handle.abort();
43 self.room_key_backup_enabled_join_handle.abort();
44 self.encryption_changes_handle.abort();
45 }
46}
47
48pub(super) fn compute_redecryption_candidates(
54 timeline_items: &Vector<Arc<TimelineItem>>,
55) -> (BTreeSet<String>, BTreeSet<String>) {
56 timeline_items
57 .iter()
58 .filter_map(|event| {
59 event.as_event().and_then(|e| {
60 let session_id = e.encryption_info().and_then(|info| info.session_id());
61
62 let session_id = if let Some(session_id) = session_id {
63 Some(session_id)
64 } else {
65 event.as_event().and_then(|e| {
66 e.content.as_unable_to_decrypt().and_then(|utd| utd.session_id())
67 })
68 };
69
70 session_id.map(|id| id.to_owned()).zip(Some(e))
71 })
72 })
73 .partition_map(|(session_id, event)| {
74 if event.content.is_unable_to_decrypt() {
75 Either::Left(session_id)
76 } else {
77 Either::Right(session_id)
78 }
79 })
80}
81
82async fn redecryption_report_task(timeline_controller: TimelineController) {
83 let client = timeline_controller.room().client();
84 let stream = client.event_cache().subscribe_to_decryption_reports();
85
86 pin_mut!(stream);
87
88 while let Some(report) = stream.next().await {
89 match report {
90 Ok(RedecryptorReport::ResolvedUtds { events, .. }) => {
91 let state = timeline_controller.state.read().await;
92
93 if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook {
94 for event_id in events {
95 utd_hook.on_late_decrypt(&event_id).await;
96 }
97 }
98 }
99 Ok(RedecryptorReport::Lagging) | Err(_) => {
100 timeline_controller.retry_event_decryption(None).await;
103 }
104 }
105 }
106}
107
108async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
110where
111 S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
112{
113 pin_mut!(backup_states_stream);
114
115 while let Some(update) = backup_states_stream.next().await {
116 match update {
117 Ok(BackupState::Enabled) | Err(_) => {
126 timeline_controller.retry_event_decryption(None).await;
127 }
128 Ok(
131 BackupState::Unknown
132 | BackupState::Creating
133 | BackupState::Resuming
134 | BackupState::Disabling
135 | BackupState::Downloading
136 | BackupState::Enabling,
137 ) => (),
138 }
139 }
140}
141
142pub(in crate::timeline) async fn spawn_crypto_tasks(
145 room: Room,
146 controller: TimelineController,
147) -> CryptoDropHandles {
148 let client = room.client();
149
150 let room_key_backup_enabled_join_handle =
151 spawn(backup_states_task(client.encryption().backups().state_stream(), controller.clone()));
152
153 let redecryption_report_join_handle = spawn(redecryption_report_task(controller.clone()));
154
155 CryptoDropHandles {
156 redecryption_report_join_handle,
157 room_key_backup_enabled_join_handle,
158 encryption_changes_handle: spawn(async move {
159 controller.handle_encryption_state_changes().await
160 }),
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
167
168 use imbl::vector;
169 use matrix_sdk::deserialized_responses::{AlgorithmInfo, EncryptionInfo, VerificationState};
170 use matrix_sdk_base::crypto::types::events::UtdCause;
171 use ruma::{
172 MilliSecondsSinceUnixEpoch, OwnedTransactionId,
173 events::room::{
174 encrypted::{
175 EncryptedEventScheme, MegolmV1AesSha2Content, MegolmV1AesSha2ContentInit,
176 RoomEncryptedEventContent,
177 },
178 message::RoomMessageEventContent,
179 },
180 owned_device_id, owned_event_id, owned_user_id,
181 };
182
183 use crate::timeline::{
184 EncryptedMessage, EventSendState, EventTimelineItem, MsgLikeContent,
185 ReactionsByKeyBySender, TimelineDetails, TimelineItem, TimelineItemContent,
186 TimelineItemKind, TimelineUniqueId, VirtualTimelineItem,
187 controller::decryption_retry_task::compute_redecryption_candidates,
188 event_item::{
189 EventTimelineItemKind, LocalEventTimelineItem, RemoteEventOrigin,
190 RemoteEventTimelineItem,
191 },
192 };
193
194 #[test]
195 fn test_non_events_are_not_retried() {
196 let timeline = vector![TimelineItem::read_marker(), date_divider()];
198 let answer = compute_redecryption_candidates(&timeline);
200 assert!(answer.0.is_empty());
202 assert!(answer.1.is_empty());
203 }
204
205 #[test]
206 fn test_non_remote_events_are_not_retried() {
207 let timeline = vector![local_event()];
209 let answer = compute_redecryption_candidates(&timeline);
211 assert!(answer.0.is_empty());
213 assert!(answer.1.is_empty());
214 }
215
216 #[test]
217 fn test_utds_are_retried() {
218 let timeline = vector![utd_event("session1")];
220 let answer = compute_redecryption_candidates(&timeline);
222 assert_eq!(answer.0.first().map(|s| s.as_str()), Some("session1"));
224 assert!(answer.1.is_empty());
225 }
226
227 #[test]
228 fn test_remote_decrypted_info_is_refetched() {
229 let timeline = vector![decrypted_event("session1")];
231 let answer = compute_redecryption_candidates(&timeline);
233 assert!(answer.0.is_empty());
235 assert_eq!(answer.1.first().map(|s| s.as_str()), Some("session1"));
236 }
237
238 #[test]
239 fn test_only_required_sessions_are_retried() {
240 let timeline = vector![
245 TimelineItem::read_marker(),
246 utd_event("session1"),
247 utd_event("session1"),
248 date_divider(),
249 utd_event("session2"),
250 decrypted_event("session1"),
251 decrypted_event("session1"),
252 decrypted_event("session2"),
253 local_event(),
254 ];
255
256 let answer = compute_redecryption_candidates(&timeline);
258
259 assert!(answer.0.contains("session1"));
261 assert!(answer.0.contains("session2"));
262 assert!(answer.1.contains("session1"));
263 assert!(answer.1.contains("session2"));
264 }
265
266 fn date_divider() -> Arc<TimelineItem> {
267 TimelineItem::new(
268 TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(timestamp())),
269 TimelineUniqueId("datething".to_owned()),
270 )
271 }
272
273 fn local_event() -> Arc<TimelineItem> {
274 let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem {
275 send_state: EventSendState::NotSentYet { progress: None },
276 transaction_id: OwnedTransactionId::from("trans"),
277 send_handle: None,
278 });
279
280 TimelineItem::new(
281 TimelineItemKind::Event(EventTimelineItem::new(
282 owned_user_id!("@u:s.to"),
283 TimelineDetails::Pending,
284 timestamp(),
285 TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
286 event_kind,
287 true,
288 )),
289 TimelineUniqueId("local".to_owned()),
290 )
291 }
292
293 fn utd_event(session_id: &str) -> Arc<TimelineItem> {
294 let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
295 event_id: owned_event_id!("$local"),
296 transaction_id: None,
297 read_receipts: Default::default(),
298 is_own: false,
299 is_highlighted: false,
300 encryption_info: None,
301 original_json: None,
302 latest_edit_json: None,
303 origin: RemoteEventOrigin::Sync,
304 });
305
306 TimelineItem::new(
307 TimelineItemKind::Event(EventTimelineItem::new(
308 owned_user_id!("@u:s.to"),
309 TimelineDetails::Pending,
310 timestamp(),
311 TimelineItemContent::MsgLike(MsgLikeContent::unable_to_decrypt(
312 EncryptedMessage::from_content(
313 RoomEncryptedEventContent::new(
314 EncryptedEventScheme::MegolmV1AesSha2(MegolmV1AesSha2Content::from(
315 MegolmV1AesSha2ContentInit {
316 ciphertext: "cyf".to_owned(),
317 sender_key: "sendk".to_owned(),
318 device_id: owned_device_id!("DEV"),
319 session_id: session_id.to_owned(),
320 },
321 )),
322 None,
323 ),
324 UtdCause::Unknown,
325 ),
326 )),
327 event_kind,
328 true,
329 )),
330 TimelineUniqueId("local".to_owned()),
331 )
332 }
333
334 fn decrypted_event(session_id: &str) -> Arc<TimelineItem> {
335 let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
336 event_id: owned_event_id!("$local"),
337 transaction_id: None,
338 read_receipts: Default::default(),
339 is_own: false,
340 is_highlighted: false,
341 encryption_info: Some(Arc::new(EncryptionInfo {
342 sender: owned_user_id!("@u:s.co"),
343 sender_device: None,
344 algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
345 curve25519_key: "".to_owned(),
346 sender_claimed_keys: BTreeMap::new(),
347 session_id: Some(session_id.to_owned()),
348 },
349 verification_state: VerificationState::Verified,
350 })),
351 original_json: None,
352 latest_edit_json: None,
353 origin: RemoteEventOrigin::Sync,
354 });
355
356 let content = RoomMessageEventContent::text_plain("hi");
357
358 TimelineItem::new(
359 TimelineItemKind::Event(EventTimelineItem::new(
360 owned_user_id!("@u:s.to"),
361 TimelineDetails::Pending,
362 timestamp(),
363 TimelineItemContent::message(
364 content.msgtype,
365 content.mentions,
366 ReactionsByKeyBySender::default(),
367 None,
368 None,
369 None,
370 ),
371 event_kind,
372 true,
373 )),
374 TimelineUniqueId("local".to_owned()),
375 )
376 }
377
378 fn timestamp() -> MilliSecondsSinceUnixEpoch {
379 MilliSecondsSinceUnixEpoch::from_system_time(SystemTime::UNIX_EPOCH).unwrap()
380 }
381}