Skip to main content

matrix_sdk_ui/timeline/controller/
decryption_retry_task.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, sync::Arc};
16
17use futures_util::pin_mut;
18use imbl::Vector;
19use itertools::{Either, Itertools as _};
20use matrix_sdk::{event_cache::RedecryptorReport, task_monitor::BackgroundTaskHandle};
21use tokio_stream::StreamExt as _;
22
23use crate::timeline::{TimelineController, TimelineItem};
24
25/// All the drop handles for the tasks used for crypto, namely message
26/// re-decryption, in the timeline.
27#[derive(Debug)]
28pub(in crate::timeline) struct CryptoDropHandles {
29    _redecryption_report_join_handle: BackgroundTaskHandle,
30    _encryption_changes_handle: BackgroundTaskHandle,
31}
32
33/// Decide which events should be retried, either for re-decryption, or, if they
34/// are already decrypted, for re-checking their encryption info.
35///
36/// Returns two sets of session IDs, one for the UTDs and one for the events
37/// that have an encryption info that might need to be updated.
38pub(super) fn compute_redecryption_candidates(
39    timeline_items: &Vector<Arc<TimelineItem>>,
40) -> (BTreeSet<String>, BTreeSet<String>) {
41    timeline_items
42        .iter()
43        .filter_map(|event| {
44            event.as_event().and_then(|e| {
45                let session_id = e.encryption_info().and_then(|info| info.session_id());
46
47                let session_id = if let Some(session_id) = session_id {
48                    Some(session_id)
49                } else {
50                    event.as_event().and_then(|e| {
51                        e.content.as_unable_to_decrypt().and_then(|utd| utd.session_id())
52                    })
53                };
54
55                session_id.map(|id| id.to_owned()).zip(Some(e))
56            })
57        })
58        .partition_map(|(session_id, event)| {
59            if event.content.is_unable_to_decrypt() {
60                Either::Left(session_id)
61            } else {
62                Either::Right(session_id)
63            }
64        })
65}
66
67async fn redecryption_report_task(timeline_controller: TimelineController) {
68    let client = timeline_controller.room().client();
69    let stream = client.event_cache().subscribe_to_decryption_reports();
70
71    pin_mut!(stream);
72
73    while let Some(report) = stream.next().await {
74        match report {
75            Ok(RedecryptorReport::ResolvedUtds { events, .. }) => {
76                let state = timeline_controller.state.read().await;
77
78                if let Some(utd_hook) = &state.meta.unable_to_decrypt_hook {
79                    for event_id in events {
80                        utd_hook.on_late_decrypt(&event_id).await;
81                    }
82                }
83            }
84            Ok(RedecryptorReport::Lagging | RedecryptorReport::BackupAvailable) | Err(_) => {
85                // Since the event cache keeps all the events we are keeping
86                // cached in the timeline in memory as well,
87                // R2D2 will handle the redecryption of these events when any of
88                // those reports come in.
89            }
90        }
91    }
92}
93
94/// Spawn all the crypto-related tasks that are used to handle re-decryption of
95/// messages.
96pub(in crate::timeline) async fn spawn_crypto_tasks(
97    controller: TimelineController,
98) -> CryptoDropHandles {
99    let client = controller.room().client();
100    let task_monitor = client.task_monitor();
101    let redecryption_report_join_handle = task_monitor
102        .spawn_infinite_task(
103            "timeline::redecryption_report",
104            redecryption_report_task(controller.clone()),
105        )
106        .abort_on_drop();
107
108    CryptoDropHandles {
109        _redecryption_report_join_handle: redecryption_report_join_handle,
110        _encryption_changes_handle: task_monitor
111            .spawn_finite_task("timeline::encryption_state_changes", async move {
112                controller.handle_encryption_state_changes().await
113            })
114            .abort_on_drop(),
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use std::{collections::BTreeMap, sync::Arc, time::SystemTime};
121
122    use imbl::vector;
123    use matrix_sdk::deserialized_responses::{AlgorithmInfo, EncryptionInfo, VerificationState};
124    use matrix_sdk_base::crypto::types::events::UtdCause;
125    use ruma::{
126        MilliSecondsSinceUnixEpoch, OwnedTransactionId,
127        events::room::{
128            encrypted::{
129                EncryptedEventScheme, MegolmV1AesSha2Content, MegolmV1AesSha2ContentInit,
130                RoomEncryptedEventContent,
131            },
132            message::RoomMessageEventContent,
133        },
134        owned_device_id, owned_event_id, owned_user_id,
135    };
136
137    use crate::timeline::{
138        EncryptedMessage, EventSendState, EventTimelineItem, MsgLikeContent,
139        ReactionsByKeyBySender, TimelineDetails, TimelineItem, TimelineItemContent,
140        TimelineItemKind, TimelineUniqueId, VirtualTimelineItem,
141        controller::decryption_retry_task::compute_redecryption_candidates,
142        event_item::{
143            EventTimelineItemKind, LocalEventTimelineItem, RemoteEventOrigin,
144            RemoteEventTimelineItem,
145        },
146    };
147
148    #[test]
149    fn test_non_events_are_not_retried() {
150        // Given a timeline with only non-events
151        let timeline = vector![TimelineItem::read_marker(), date_divider()];
152        // When we ask what to retry
153        let answer = compute_redecryption_candidates(&timeline);
154        // Then we retry nothing
155        assert!(answer.0.is_empty());
156        assert!(answer.1.is_empty());
157    }
158
159    #[test]
160    fn test_non_remote_events_are_not_retried() {
161        // Given a timeline with only local events
162        let timeline = vector![local_event()];
163        // When we ask what to retry
164        let answer = compute_redecryption_candidates(&timeline);
165        // Then we retry nothing
166        assert!(answer.0.is_empty());
167        assert!(answer.1.is_empty());
168    }
169
170    #[test]
171    fn test_utds_are_retried() {
172        // Given a timeline with a UTD
173        let timeline = vector![utd_event("session1")];
174        // When we ask what to retry
175        let answer = compute_redecryption_candidates(&timeline);
176        // Then we retry decrypting it, and don't refetch any encryption info
177        assert_eq!(answer.0.first().map(|s| s.as_str()), Some("session1"));
178        assert!(answer.1.is_empty());
179    }
180
181    #[test]
182    fn test_remote_decrypted_info_is_refetched() {
183        // Given a timeline with a decrypted event
184        let timeline = vector![decrypted_event("session1")];
185        // When we ask what to retry
186        let answer = compute_redecryption_candidates(&timeline);
187        // Then we don't need to decrypt anything, but we do refetch the encryption info
188        assert!(answer.0.is_empty());
189        assert_eq!(answer.1.first().map(|s| s.as_str()), Some("session1"));
190    }
191
192    #[test]
193    fn test_only_required_sessions_are_retried() {
194        // Given we want to retry everything in session1 only
195
196        // And we have a timeline containing non-events, local events, UTDs and
197        // decrypted events
198        let timeline = vector![
199            TimelineItem::read_marker(),
200            utd_event("session1"),
201            utd_event("session1"),
202            date_divider(),
203            utd_event("session2"),
204            decrypted_event("session1"),
205            decrypted_event("session1"),
206            decrypted_event("session2"),
207            local_event(),
208        ];
209
210        // When we ask what to retry
211        let answer = compute_redecryption_candidates(&timeline);
212
213        // Then we re-decrypt the UTDs, and refetch the decrypted events' info
214        assert!(answer.0.contains("session1"));
215        assert!(answer.0.contains("session2"));
216        assert!(answer.1.contains("session1"));
217        assert!(answer.1.contains("session2"));
218    }
219
220    fn date_divider() -> Arc<TimelineItem> {
221        TimelineItem::new(
222            TimelineItemKind::Virtual(VirtualTimelineItem::DateDivider(timestamp())),
223            TimelineUniqueId("datething".to_owned()),
224        )
225    }
226
227    fn local_event() -> Arc<TimelineItem> {
228        let event_kind = EventTimelineItemKind::Local(LocalEventTimelineItem {
229            send_state: EventSendState::NotSentYet { progress: None },
230            transaction_id: OwnedTransactionId::from("trans"),
231            send_handle: None,
232        });
233
234        TimelineItem::new(
235            TimelineItemKind::Event(EventTimelineItem::new(
236                owned_user_id!("@u:s.to"),
237                TimelineDetails::Pending,
238                None,
239                None,
240                timestamp(),
241                TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
242                event_kind,
243                true,
244            )),
245            TimelineUniqueId("local".to_owned()),
246        )
247    }
248
249    fn utd_event(session_id: &str) -> Arc<TimelineItem> {
250        let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
251            event_id: owned_event_id!("$local"),
252            transaction_id: None,
253            read_receipts: Default::default(),
254            is_own: false,
255            is_highlighted: false,
256            encryption_info: None,
257            original_json: None,
258            latest_edit_json: None,
259            origin: RemoteEventOrigin::Sync,
260        });
261
262        TimelineItem::new(
263            TimelineItemKind::Event(EventTimelineItem::new(
264                owned_user_id!("@u:s.to"),
265                TimelineDetails::Pending,
266                None,
267                None,
268                timestamp(),
269                TimelineItemContent::MsgLike(MsgLikeContent::unable_to_decrypt(
270                    EncryptedMessage::from_content(
271                        RoomEncryptedEventContent::new(
272                            EncryptedEventScheme::MegolmV1AesSha2(MegolmV1AesSha2Content::from(
273                                MegolmV1AesSha2ContentInit {
274                                    ciphertext: "cyf".to_owned(),
275                                    sender_key: "sendk".to_owned(),
276                                    device_id: owned_device_id!("DEV"),
277                                    session_id: session_id.to_owned(),
278                                },
279                            )),
280                            None,
281                        ),
282                        UtdCause::Unknown,
283                    ),
284                )),
285                event_kind,
286                true,
287            )),
288            TimelineUniqueId("local".to_owned()),
289        )
290    }
291
292    fn decrypted_event(session_id: &str) -> Arc<TimelineItem> {
293        let event_kind = EventTimelineItemKind::Remote(RemoteEventTimelineItem {
294            event_id: owned_event_id!("$local"),
295            transaction_id: None,
296            read_receipts: Default::default(),
297            is_own: false,
298            is_highlighted: false,
299            encryption_info: Some(Arc::new(EncryptionInfo {
300                sender: owned_user_id!("@u:s.co"),
301                sender_device: None,
302                forwarder: None,
303                algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
304                    curve25519_key: "".to_owned(),
305                    sender_claimed_keys: BTreeMap::new(),
306                    session_id: Some(session_id.to_owned()),
307                },
308                verification_state: VerificationState::Verified,
309            })),
310            original_json: None,
311            latest_edit_json: None,
312            origin: RemoteEventOrigin::Sync,
313        });
314
315        let content = RoomMessageEventContent::text_plain("hi");
316
317        TimelineItem::new(
318            TimelineItemKind::Event(EventTimelineItem::new(
319                owned_user_id!("@u:s.to"),
320                TimelineDetails::Pending,
321                None,
322                None,
323                timestamp(),
324                TimelineItemContent::message(
325                    content.msgtype,
326                    content.mentions,
327                    ReactionsByKeyBySender::default(),
328                    None,
329                    None,
330                    None,
331                ),
332                event_kind,
333                true,
334            )),
335            TimelineUniqueId("local".to_owned()),
336        )
337    }
338
339    fn timestamp() -> MilliSecondsSinceUnixEpoch {
340        MilliSecondsSinceUnixEpoch::from_system_time(SystemTime::UNIX_EPOCH).unwrap()
341    }
342}