matrix_sdk_ui/timeline/controller/
decryption_retry_task.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_core::Stream;
18use futures_util::pin_mut;
19use imbl::Vector;
20use itertools::{Either, Itertools as _};
21use matrix_sdk::{
22    Room,
23    encryption::backups::BackupState,
24    event_cache::RedecryptorReport,
25    executor::{JoinHandle, spawn},
26};
27use tokio_stream::{StreamExt as _, wrappers::errors::BroadcastStreamRecvError};
28
29use crate::timeline::{TimelineController, TimelineItem};
30
31/// All the drop handles for the tasks used for crypto, namely message
32/// re-decryption, in the timeline.
33#[derive(Debug)]
34pub(in crate::timeline) struct CryptoDropHandles {
35    redecryption_report_join_handle: JoinHandle<()>,
36    room_key_backup_enabled_join_handle: JoinHandle<()>,
37    encryption_changes_handle: JoinHandle<()>,
38}
39
40impl Drop for CryptoDropHandles {
41    fn drop(&mut self) {
42        self.redecryption_report_join_handle.abort();
43        self.room_key_backup_enabled_join_handle.abort();
44        self.encryption_changes_handle.abort();
45    }
46}
47
48/// Decide which events should be retried, either for re-decryption, or, if they
49/// are already decrypted, for re-checking their encryption info.
50///
51/// Returns two sets of session IDs, one for the UTDs and one for the events
52/// that have an encryption info that might need to be updated.
53pub(super) fn compute_redecryption_candidates(
54    timeline_items: &Vector<Arc<TimelineItem>>,
55) -> (BTreeSet<String>, BTreeSet<String>) {
56    timeline_items
57        .iter()
58        .filter_map(|event| {
59            event.as_event().and_then(|e| {
60                let session_id = e.encryption_info().and_then(|info| info.session_id());
61
62                let session_id = if let Some(session_id) = session_id {
63                    Some(session_id)
64                } else {
65                    event.as_event().and_then(|e| {
66                        e.content.as_unable_to_decrypt().and_then(|utd| utd.session_id())
67                    })
68                };
69
70                session_id.map(|id| id.to_owned()).zip(Some(e))
71            })
72        })
73        .partition_map(|(session_id, event)| {
74            if event.content.is_unable_to_decrypt() {
75                Either::Left(session_id)
76            } else {
77                Either::Right(session_id)
78            }
79        })
80}
81
82async fn redecryption_report_task(timeline_controller: TimelineController) {
83    let client = timeline_controller.room().client();
84    let stream = client.event_cache().subscribe_to_decryption_reports();
85
86    pin_mut!(stream);
87
88    while let Some(report) = stream.next().await {
89        match report {
90            Ok(RedecryptorReport::ResolvedUtds { events, .. }) => {
91                let state = timeline_controller.state.read().await;
92
93                if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook {
94                    for event_id in events {
95                        utd_hook.on_late_decrypt(&event_id).await;
96                    }
97                }
98            }
99            Ok(RedecryptorReport::Lagging) | Err(_) => {
100                // The room key stream lagged or the OlmMachine got regenerated. Let's tell the
101                // redecryptor to attempt redecryption of our timeline items.
102                timeline_controller.retry_event_decryption(None).await;
103            }
104        }
105    }
106}
107
108/// The task that handles the [`BackupState`] updates.
109async fn backup_states_task<S>(backup_states_stream: S, timeline_controller: TimelineController)
110where
111    S: Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
112{
113    pin_mut!(backup_states_stream);
114
115    while let Some(update) = backup_states_stream.next().await {
116        match update {
117            // If the backup got enabled, or we lagged and thus missed that the backup
118            // might be enabled, retry to decrypt all the events. Please note, depending
119            // on the backup download strategy, this might do two things under the
120            // assumption that the backup contains the relevant room keys:
121            //
122            // 1. It will decrypt the events, if `BackupDownloadStrategy` has been set to `OneShot`.
123            // 2. It will fail to decrypt the event, but try to download the room key to decrypt it
124            //    if the `BackupDownloadStrategy` has been set to `AfterDecryptionFailure`.
125            Ok(BackupState::Enabled) | Err(_) => {
126                timeline_controller.retry_event_decryption(None).await;
127            }
128            // The other states aren't interesting since they are either still enabling
129            // the backup or have the backup in the disabled state.
130            Ok(
131                BackupState::Unknown
132                | BackupState::Creating
133                | BackupState::Resuming
134                | BackupState::Disabling
135                | BackupState::Downloading
136                | BackupState::Enabling,
137            ) => (),
138        }
139    }
140}
141
142/// Spawn all the crypto-related tasks that are used to handle re-decryption of
143/// messages.
144pub(in crate::timeline) async fn spawn_crypto_tasks(
145    room: Room,
146    controller: TimelineController,
147) -> CryptoDropHandles {
148    let client = room.client();
149
150    let room_key_backup_enabled_join_handle =
151        spawn(backup_states_task(client.encryption().backups().state_stream(), controller.clone()));
152
153    let redecryption_report_join_handle = spawn(redecryption_report_task(controller.clone()));
154
155    CryptoDropHandles {
156        redecryption_report_join_handle,
157        room_key_backup_enabled_join_handle,
158        encryption_changes_handle: spawn(async move {
159            controller.handle_encryption_state_changes().await
160        }),
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
167
168    use imbl::vector;
169    use matrix_sdk::deserialized_responses::{AlgorithmInfo, EncryptionInfo, VerificationState};
170    use matrix_sdk_base::crypto::types::events::UtdCause;
171    use ruma::{
172        MilliSecondsSinceUnixEpoch, OwnedTransactionId,
173        events::room::{
174            encrypted::{
175                EncryptedEventScheme, MegolmV1AesSha2Content, MegolmV1AesSha2ContentInit,
176                RoomEncryptedEventContent,
177            },
178            message::RoomMessageEventContent,
179        },
180        owned_device_id, owned_event_id, owned_user_id,
181    };
182
183    use crate::timeline::{
184        EncryptedMessage, EventSendState, EventTimelineItem, MsgLikeContent,
185        ReactionsByKeyBySender, TimelineDetails, TimelineItem, TimelineItemContent,
186        TimelineItemKind, TimelineUniqueId, VirtualTimelineItem,
187        controller::decryption_retry_task::compute_redecryption_candidates,
188        event_item::{
189            EventTimelineItemKind, LocalEventTimelineItem, RemoteEventOrigin,
190            RemoteEventTimelineItem,
191        },
192    };
193
194    #[test]
195    fn test_non_events_are_not_retried() {
196        // Given a timeline with only non-events
197        let timeline = vector![TimelineItem::read_marker(), date_divider()];
198        // When we ask what to retry
199        let answer = compute_redecryption_candidates(&timeline);
200        // Then we retry nothing
201        assert!(answer.0.is_empty());
202        assert!(answer.1.is_empty());
203    }
204
205    #[test]
206    fn test_non_remote_events_are_not_retried() {
207        // Given a timeline with only local events
208        let timeline = vector![local_event()];
209        // When we ask what to retry
210        let answer = compute_redecryption_candidates(&timeline);
211        // Then we retry nothing
212        assert!(answer.0.is_empty());
213        assert!(answer.1.is_empty());
214    }
215
216    #[test]
217    fn test_utds_are_retried() {
218        // Given a timeline with a UTD
219        let timeline = vector![utd_event("session1")];
220        // When we ask what to retry
221        let answer = compute_redecryption_candidates(&timeline);
222        // Then we retry decrypting it, and don't refetch any encryption info
223        assert_eq!(answer.0.first().map(|s| s.as_str()), Some("session1"));
224        assert!(answer.1.is_empty());
225    }
226
227    #[test]
228    fn test_remote_decrypted_info_is_refetched() {
229        // Given a timeline with a decrypted event
230        let timeline = vector![decrypted_event("session1")];
231        // When we ask what to retry
232        let answer = compute_redecryption_candidates(&timeline);
233        // Then we don't need to decrypt anything, but we do refetch the encryption info
234        assert!(answer.0.is_empty());
235        assert_eq!(answer.1.first().map(|s| s.as_str()), Some("session1"));
236    }
237
238    #[test]
239    fn test_only_required_sessions_are_retried() {
240        // Given we want to retry everything in session1 only
241
242        // And we have a timeline containing non-events, local events, UTDs and
243        // decrypted events
244        let timeline = vector![
245            TimelineItem::read_marker(),
246            utd_event("session1"),
247            utd_event("session1"),
248            date_divider(),
249            utd_event("session2"),
250            decrypted_event("session1"),
251            decrypted_event("session1"),
252            decrypted_event("session2"),
253            local_event(),
254        ];
255
256        // When we ask what to retry
257        let answer = compute_redecryption_candidates(&timeline);
258
259        // Then we re-decrypt the UTDs, and refetch the decrypted events' info
260        assert!(answer.0.contains("session1"));
261        assert!(answer.0.contains("session2"));
262        assert!(answer.1.contains("session1"));
263        assert!(answer.1.contains("session2"));
264    }
265
266    fn date_divider() -> Arc<TimelineItem> {
267        TimelineItem::new(
268            TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(timestamp())),
269            TimelineUniqueId("datething".to_owned()),
270        )
271    }
272
273    fn local_event() -> Arc<TimelineItem> {
274        let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem {
275            send_state: EventSendState::NotSentYet { progress: None },
276            transaction_id: OwnedTransactionId::from("trans"),
277            send_handle: None,
278        });
279
280        TimelineItem::new(
281            TimelineItemKind::Event(EventTimelineItem::new(
282                owned_user_id!("@u:s.to"),
283                TimelineDetails::Pending,
284                timestamp(),
285                TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
286                event_kind,
287                true,
288            )),
289            TimelineUniqueId("local".to_owned()),
290        )
291    }
292
293    fn utd_event(session_id: &str) -> Arc<TimelineItem> {
294        let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
295            event_id: owned_event_id!("$local"),
296            transaction_id: None,
297            read_receipts: Default::default(),
298            is_own: false,
299            is_highlighted: false,
300            encryption_info: None,
301            original_json: None,
302            latest_edit_json: None,
303            origin: RemoteEventOrigin::Sync,
304        });
305
306        TimelineItem::new(
307            TimelineItemKind::Event(EventTimelineItem::new(
308                owned_user_id!("@u:s.to"),
309                TimelineDetails::Pending,
310                timestamp(),
311                TimelineItemContent::MsgLike(MsgLikeContent::unable_to_decrypt(
312                    EncryptedMessage::from_content(
313                        RoomEncryptedEventContent::new(
314                            EncryptedEventScheme::MegolmV1AesSha2(MegolmV1AesSha2Content::from(
315                                MegolmV1AesSha2ContentInit {
316                                    ciphertext: "cyf".to_owned(),
317                                    sender_key: "sendk".to_owned(),
318                                    device_id: owned_device_id!("DEV"),
319                                    session_id: session_id.to_owned(),
320                                },
321                            )),
322                            None,
323                        ),
324                        UtdCause::Unknown,
325                    ),
326                )),
327                event_kind,
328                true,
329            )),
330            TimelineUniqueId("local".to_owned()),
331        )
332    }
333
334    fn decrypted_event(session_id: &str) -> Arc<TimelineItem> {
335        let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
336            event_id: owned_event_id!("$local"),
337            transaction_id: None,
338            read_receipts: Default::default(),
339            is_own: false,
340            is_highlighted: false,
341            encryption_info: Some(Arc::new(EncryptionInfo {
342                sender: owned_user_id!("@u:s.co"),
343                sender_device: None,
344                algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
345                    curve25519_key: "".to_owned(),
346                    sender_claimed_keys: BTreeMap::new(),
347                    session_id: Some(session_id.to_owned()),
348                },
349                verification_state: VerificationState::Verified,
350            })),
351            original_json: None,
352            latest_edit_json: None,
353            origin: RemoteEventOrigin::Sync,
354        });
355
356        let content = RoomMessageEventContent::text_plain("hi");
357
358        TimelineItem::new(
359            TimelineItemKind::Event(EventTimelineItem::new(
360                owned_user_id!("@u:s.to"),
361                TimelineDetails::Pending,
362                timestamp(),
363                TimelineItemContent::message(
364                    content.msgtype,
365                    content.mentions,
366                    ReactionsByKeyBySender::default(),
367                    None,
368                    None,
369                    None,
370                ),
371                event_kind,
372                true,
373            )),
374            TimelineUniqueId("local".to_owned()),
375        )
376    }
377
378    fn timestamp() -> MilliSecondsSinceUnixEpoch {
379        MilliSecondsSinceUnixEpoch::from_system_time(SystemTime::UNIX_EPOCH).unwrap()
380    }
381}