matrix_sdk_ui/timeline/controller/
decryption_retry_task.rs1use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::pin_mut;
18use imbl::Vector;
19use itertools::{Either, Itertools as _};
20use matrix_sdk::{event_cache::RedecryptorReport, task_monitor::BackgroundTaskHandle};
21use tokio_stream::StreamExt as _;
22
23use crate::timeline::{TimelineController, TimelineItem};
24
25#[derive(Debug)]
28pub(in crate::timeline) struct CryptoDropHandles {
29 _redecryption_report_join_handle: BackgroundTaskHandle,
30 _encryption_changes_handle: BackgroundTaskHandle,
31}
32
33pub(super) fn compute_redecryption_candidates(
39 timeline_items: &Vector<Arc<TimelineItem>>,
40) -> (BTreeSet<String>, BTreeSet<String>) {
41 timeline_items
42 .iter()
43 .filter_map(|event| {
44 event.as_event().and_then(|e| {
45 let session_id = e.encryption_info().and_then(|info| info.session_id());
46
47 let session_id = if let Some(session_id) = session_id {
48 Some(session_id)
49 } else {
50 event.as_event().and_then(|e| {
51 e.content.as_unable_to_decrypt().and_then(|utd| utd.session_id())
52 })
53 };
54
55 session_id.map(|id| id.to_owned()).zip(Some(e))
56 })
57 })
58 .partition_map(|(session_id, event)| {
59 if event.content.is_unable_to_decrypt() {
60 Either::Left(session_id)
61 } else {
62 Either::Right(session_id)
63 }
64 })
65}
66
67async fn redecryption_report_task(timeline_controller: TimelineController) {
68 let client = timeline_controller.room().client();
69 let stream = client.event_cache().subscribe_to_decryption_reports();
70
71 pin_mut!(stream);
72
73 while let Some(report) = stream.next().await {
74 match report {
75 Ok(RedecryptorReport::ResolvedUtds { events, .. }) => {
76 let state = timeline_controller.state.read().await;
77
78 if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook {
79 for event_id in events {
80 utd_hook.on_late_decrypt(&event_id).await;
81 }
82 }
83 }
84 Ok(RedecryptorReport::Lagging | RedecryptorReport::BackupAvailable) | Err(_) => {
85 }
90 }
91 }
92}
93
94pub(in crate::timeline) async fn spawn_crypto_tasks(
97 controller: TimelineController,
98) -> CryptoDropHandles {
99 let client = controller.room().client();
100 let task_monitor = client.task_monitor();
101 let redecryption_report_join_handle = task_monitor
102 .spawn_infinite_task(
103 "timeline::redecryption_report",
104 redecryption_report_task(controller.clone()),
105 )
106 .abort_on_drop();
107
108 CryptoDropHandles {
109 _redecryption_report_join_handle: redecryption_report_join_handle,
110 _encryption_changes_handle: task_monitor
111 .spawn_finite_task("timeline::encryption_state_changes", async move {
112 controller.handle_encryption_state_changes().await
113 })
114 .abort_on_drop(),
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
121
122 use imbl::vector;
123 use matrix_sdk::deserialized_responses::{AlgorithmInfo, EncryptionInfo, VerificationState};
124 use matrix_sdk_base::crypto::types::events::UtdCause;
125 use ruma::{
126 MilliSecondsSinceUnixEpoch, OwnedTransactionId,
127 events::room::{
128 encrypted::{
129 EncryptedEventScheme, MegolmV1AesSha2Content, MegolmV1AesSha2ContentInit,
130 RoomEncryptedEventContent,
131 },
132 message::RoomMessageEventContent,
133 },
134 owned_device_id, owned_event_id, owned_user_id,
135 };
136
137 use crate::timeline::{
138 EncryptedMessage, EventSendState, EventTimelineItem, MsgLikeContent,
139 ReactionsByKeyBySender, TimelineDetails, TimelineItem, TimelineItemContent,
140 TimelineItemKind, TimelineUniqueId, VirtualTimelineItem,
141 controller::decryption_retry_task::compute_redecryption_candidates,
142 event_item::{
143 EventTimelineItemKind, LocalEventTimelineItem, RemoteEventOrigin,
144 RemoteEventTimelineItem,
145 },
146 };
147
148 #[test]
149 fn test_non_events_are_not_retried() {
150 let timeline = vector![TimelineItem::read_marker(), date_divider()];
152 let answer = compute_redecryption_candidates(&timeline);
154 assert!(answer.0.is_empty());
156 assert!(answer.1.is_empty());
157 }
158
159 #[test]
160 fn test_non_remote_events_are_not_retried() {
161 let timeline = vector![local_event()];
163 let answer = compute_redecryption_candidates(&timeline);
165 assert!(answer.0.is_empty());
167 assert!(answer.1.is_empty());
168 }
169
170 #[test]
171 fn test_utds_are_retried() {
172 let timeline = vector![utd_event("session1")];
174 let answer = compute_redecryption_candidates(&timeline);
176 assert_eq!(answer.0.first().map(|s| s.as_str()), Some("session1"));
178 assert!(answer.1.is_empty());
179 }
180
181 #[test]
182 fn test_remote_decrypted_info_is_refetched() {
183 let timeline = vector![decrypted_event("session1")];
185 let answer = compute_redecryption_candidates(&timeline);
187 assert!(answer.0.is_empty());
189 assert_eq!(answer.1.first().map(|s| s.as_str()), Some("session1"));
190 }
191
192 #[test]
193 fn test_only_required_sessions_are_retried() {
194 let timeline = vector![
199 TimelineItem::read_marker(),
200 utd_event("session1"),
201 utd_event("session1"),
202 date_divider(),
203 utd_event("session2"),
204 decrypted_event("session1"),
205 decrypted_event("session1"),
206 decrypted_event("session2"),
207 local_event(),
208 ];
209
210 let answer = compute_redecryption_candidates(&timeline);
212
213 assert!(answer.0.contains("session1"));
215 assert!(answer.0.contains("session2"));
216 assert!(answer.1.contains("session1"));
217 assert!(answer.1.contains("session2"));
218 }
219
220 fn date_divider() -> Arc<TimelineItem> {
221 TimelineItem::new(
222 TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(timestamp())),
223 TimelineUniqueId("datething".to_owned()),
224 )
225 }
226
227 fn local_event() -> Arc<TimelineItem> {
228 let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem {
229 send_state: EventSendState::NotSentYet { progress: None },
230 transaction_id: OwnedTransactionId::from("trans"),
231 send_handle: None,
232 });
233
234 TimelineItem::new(
235 TimelineItemKind::Event(EventTimelineItem::new(
236 owned_user_id!("@u:s.to"),
237 TimelineDetails::Pending,
238 None,
239 None,
240 timestamp(),
241 TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
242 event_kind,
243 true,
244 )),
245 TimelineUniqueId("local".to_owned()),
246 )
247 }
248
249 fn utd_event(session_id: &str) -> Arc<TimelineItem> {
250 let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
251 event_id: owned_event_id!("$local"),
252 transaction_id: None,
253 read_receipts: Default::default(),
254 is_own: false,
255 is_highlighted: false,
256 encryption_info: None,
257 original_json: None,
258 latest_edit_json: None,
259 origin: RemoteEventOrigin::Sync,
260 });
261
262 TimelineItem::new(
263 TimelineItemKind::Event(EventTimelineItem::new(
264 owned_user_id!("@u:s.to"),
265 TimelineDetails::Pending,
266 None,
267 None,
268 timestamp(),
269 TimelineItemContent::MsgLike(MsgLikeContent::unable_to_decrypt(
270 EncryptedMessage::from_content(
271 RoomEncryptedEventContent::new(
272 EncryptedEventScheme::MegolmV1AesSha2(MegolmV1AesSha2Content::from(
273 MegolmV1AesSha2ContentInit {
274 ciphertext: "cyf".to_owned(),
275 sender_key: "sendk".to_owned(),
276 device_id: owned_device_id!("DEV"),
277 session_id: session_id.to_owned(),
278 },
279 )),
280 None,
281 ),
282 UtdCause::Unknown,
283 ),
284 )),
285 event_kind,
286 true,
287 )),
288 TimelineUniqueId("local".to_owned()),
289 )
290 }
291
292 fn decrypted_event(session_id: &str) -> Arc<TimelineItem> {
293 let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
294 event_id: owned_event_id!("$local"),
295 transaction_id: None,
296 read_receipts: Default::default(),
297 is_own: false,
298 is_highlighted: false,
299 encryption_info: Some(Arc::new(EncryptionInfo {
300 sender: owned_user_id!("@u:s.co"),
301 sender_device: None,
302 forwarder: None,
303 algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
304 curve25519_key: "".to_owned(),
305 sender_claimed_keys: BTreeMap::new(),
306 session_id: Some(session_id.to_owned()),
307 },
308 verification_state: VerificationState::Verified,
309 })),
310 original_json: None,
311 latest_edit_json: None,
312 origin: RemoteEventOrigin::Sync,
313 });
314
315 let content = RoomMessageEventContent::text_plain("hi");
316
317 TimelineItem::new(
318 TimelineItemKind::Event(EventTimelineItem::new(
319 owned_user_id!("@u:s.to"),
320 TimelineDetails::Pending,
321 None,
322 None,
323 timestamp(),
324 TimelineItemContent::message(
325 content.msgtype,
326 content.mentions,
327 ReactionsByKeyBySender::default(),
328 None,
329 None,
330 None,
331 ),
332 event_kind,
333 true,
334 )),
335 TimelineUniqueId("local".to_owned()),
336 )
337 }
338
339 fn timestamp() -> MilliSecondsSinceUnixEpoch {
340 MilliSecondsSinceUnixEpoch::from_system_time(SystemTime::UNIX_EPOCH).unwrap()
341 }
342}