Skip to main content

matrix_sdk_ui/timeline/
tasks.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Long-lived tasks for the timeline.
16
17use std::collections::BTreeSet;
18
19use matrix_sdk::{
20    event_cache::{
21        EventFocusThreadMode, EventsOrigin, RoomEventCache, RoomEventCacheSubscriber,
22        RoomEventCacheUpdate, TimelineVectorDiffs,
23    },
24    send_queue::RoomSendQueueUpdate,
25};
26use ruma::OwnedEventId;
27use tokio::sync::broadcast::{Receiver, error::RecvError};
28use tracing::{error, instrument, trace, warn};
29
30use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
31
32/// Long-lived task, in the pinned events focus mode, that updates the timeline
33/// after any changes in the pinned events.
34#[instrument(
35    skip_all,
36    fields(
37        room_id = %timeline_controller.room().room_id(),
38    )
39)]
40pub(in crate::timeline) async fn pinned_events_task(
41    room_event_cache: RoomEventCache,
42    timeline_controller: TimelineController,
43    mut pinned_events_recv: Receiver<TimelineVectorDiffs>,
44) {
45    loop {
46        trace!("Waiting for an event.");
47
48        let update = match pinned_events_recv.recv().await {
49            Ok(up) => up,
50            Err(RecvError::Closed) => break,
51            Err(RecvError::Lagged(num_skipped)) => {
52                warn!(num_skipped, "Lagged behind pinned-event cache updates, resetting timeline");
53
54                // The updates might have lagged, but the room event cache might have
55                // events, so retrieve them and add them back again to the timeline,
56                // after clearing it.
57                let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await
58                {
59                    Ok(initial_events) => initial_events,
60                    Err(err) => {
61                        error!(
62                            ?err,
63                            "Failed to replace the initial remote events in the event cache"
64                        );
65                        break;
66                    }
67                };
68
69                timeline_controller
70                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
71                    .await;
72
73                continue;
74            }
75        };
76
77        trace!("Received new timeline events diffs");
78        let origin = match update.origin {
79            EventsOrigin::Sync => RemoteEventOrigin::Sync,
80            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
81            EventsOrigin::Cache => RemoteEventOrigin::Cache,
82        };
83        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
84    }
85}
86
87/// Long-lived task, in the event focus mode, that updates the timeline after
88/// any changes to the underlying timeline.
89#[instrument(
90    skip_all,
91    fields(
92        room_id = %timeline_controller.room().room_id(),
93        focused_event_id = %focused_event,
94        ?thread_mode
95    )
96)]
97pub(in crate::timeline) async fn event_focused_task(
98    focused_event: OwnedEventId,
99    thread_mode: EventFocusThreadMode,
100    room_event_cache: RoomEventCache,
101    timeline_controller: TimelineController,
102    mut event_focused_events_recv: Receiver<TimelineVectorDiffs>,
103) {
104    loop {
105        trace!("Waiting for an event.");
106
107        let update = match event_focused_events_recv.recv().await {
108            Ok(up) => up,
109            Err(RecvError::Closed) => break,
110            Err(RecvError::Lagged(num_skipped)) => {
111                warn!(num_skipped, "Lagged behind focused-event cache updates, resetting timeline");
112
113                // The updates might have lagged, but the room event cache might have
114                // events, so retrieve them and add them back again to the timeline,
115                // after clearing it.
116                let cache = match room_event_cache
117                    .get_event_focused_cache(focused_event.clone(), thread_mode)
118                    .await
119                {
120                    Ok(Some(cache)) => cache,
121                    Ok(None) => {
122                        error!("Focused event timeline doesn't have an attached cache");
123                        break;
124                    }
125                    Err(err) => {
126                        error!(%err, "Failed to get the focused cache for the focused event");
127                        break;
128                    }
129                };
130
131                let (initial_events, _) = cache.subscribe().await;
132
133                timeline_controller
134                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
135                    .await;
136
137                continue;
138            }
139        };
140
141        trace!("Received new timeline events diffs");
142        let origin = match update.origin {
143            EventsOrigin::Sync => RemoteEventOrigin::Sync,
144            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
145            EventsOrigin::Cache => RemoteEventOrigin::Cache,
146        };
147        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
148    }
149}
150
151/// For a thread-focused timeline, a long-lived task that will listen to the
152/// underlying thread updates.
153pub(in crate::timeline) async fn thread_updates_task(
154    mut receiver: Receiver<TimelineVectorDiffs>,
155    room_event_cache: RoomEventCache,
156    timeline_controller: TimelineController,
157    root: OwnedEventId,
158) {
159    trace!("Spawned the thread event subscriber task.");
160
161    loop {
162        trace!("Waiting for an event.");
163
164        let update = match receiver.recv().await {
165            Ok(up) => up,
166            Err(RecvError::Closed) => break,
167            Err(RecvError::Lagged(num_skipped)) => {
168                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
169
170                // The updates might have lagged, but the room event cache might
171                // have events, so retrieve them and add them back again to the
172                // timeline, after clearing it.
173                _ = timeline_controller.init_with_thread_root(&root, &room_event_cache).await;
174
175                continue;
176            }
177        };
178
179        trace!("Received new timeline events diffs");
180
181        let origin = match update.origin {
182            EventsOrigin::Sync => RemoteEventOrigin::Sync,
183            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
184            EventsOrigin::Cache => RemoteEventOrigin::Cache,
185        };
186
187        let has_diffs = !update.diffs.is_empty();
188
189        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
190
191        if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
192            timeline_controller.retry_event_decryption(None).await;
193        }
194    }
195
196    trace!("Thread event subscriber task finished.");
197}
198
199/// Long-lived task that forwards the [`RoomEventCacheUpdate`]s (remote echoes)
200/// to the timeline.
201pub(in crate::timeline) async fn room_event_cache_updates_task(
202    room_event_cache: RoomEventCache,
203    timeline_controller: TimelineController,
204    mut room_event_cache_subscriber: RoomEventCacheSubscriber,
205    timeline_focus: TimelineFocus,
206) {
207    trace!("Spawned the event subscriber task.");
208
209    loop {
210        trace!("Waiting for an event.");
211
212        let update = match room_event_cache_subscriber.recv().await {
213            Ok(up) => up,
214            Err(RecvError::Closed) => break,
215            Err(RecvError::Lagged(num_skipped)) => {
216                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
217
218                // The updates might have lagged, but the room event cache might have
219                // events, so retrieve them and add them back again to the timeline,
220                // after clearing it.
221                let initial_events = match room_event_cache.events().await {
222                    Ok(initial_events) => initial_events,
223                    Err(err) => {
224                        error!(
225                            ?err,
226                            "Failed to replace the initial remote events in the event cache"
227                        );
228                        break;
229                    }
230                };
231
232                timeline_controller
233                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
234                    .await;
235
236                continue;
237            }
238        };
239
240        match update {
241            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
242                trace!(target = %event_id, "Handling fully read marker.");
243                timeline_controller.handle_fully_read_marker(event_id).await;
244            }
245
246            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, origin }) => {
247                trace!("Received new timeline events diffs");
248                let origin = match origin {
249                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
250                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
251                    EventsOrigin::Cache => RemoteEventOrigin::Cache,
252                };
253
254                let has_diffs = !diffs.is_empty();
255
256                if matches!(timeline_focus, TimelineFocus::Live { .. }) {
257                    timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
258                } else if !matches!(timeline_focus, TimelineFocus::PinnedEvents) {
259                    // Only handle the remote aggregation for a non-live timeline, that's not the
260                    // pinned events one (since the latter handles remote aggregations on its own).
261                    timeline_controller.handle_remote_aggregations(diffs, origin).await;
262                }
263
264                if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
265                    timeline_controller.retry_event_decryption(None).await;
266                }
267            }
268
269            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
270                trace!("Received new ephemeral events from sync.");
271
272                // TODO: ephemeral (read receipts) should be handled by the event cache (#4113).
273                timeline_controller.handle_ephemeral_events(events).await;
274            }
275
276            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
277                if !ambiguity_changes.is_empty() {
278                    let member_ambiguity_changes = ambiguity_changes
279                        .values()
280                        .flat_map(|change| change.user_ids())
281                        .collect::<BTreeSet<_>>();
282                    timeline_controller
283                        .force_update_sender_profiles(&member_ambiguity_changes)
284                        .await;
285                }
286            }
287        }
288    }
289}
290
291/// Long-lived task that forwards [`RoomSendQueueUpdate`]s (local echoes) to the
292/// timeline.
293pub(in crate::timeline) async fn room_send_queue_update_task(
294    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
295    timeline_controller: TimelineController,
296) {
297    trace!("spawned the local echo task!");
298
299    loop {
300        match send_queue_stream.recv().await {
301            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
302
303            Err(RecvError::Lagged(num_missed)) => {
304                warn!("missed {num_missed} local echoes, ignoring those missed");
305            }
306
307            Err(RecvError::Closed) => {
308                trace!("channel closed, exiting the local echo handler");
309                break;
310            }
311        }
312    }
313}