matrix_sdk_ui/timeline/
tasks.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Long-lived tasks for the timeline.
16
17use std::collections::BTreeSet;
18
19use futures_core::Stream;
20use futures_util::pin_mut;
21use matrix_sdk::{
22    event_cache::{
23        EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
24        ThreadEventCacheUpdate,
25    },
26    send_queue::RoomSendQueueUpdate,
27};
28use ruma::OwnedEventId;
29use tokio::sync::broadcast::{Receiver, error::RecvError};
30use tokio_stream::StreamExt as _;
31use tracing::{error, instrument, trace, warn};
32
33use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
34
35/// Long-lived task, in the pinned events focus mode, that updates the timeline
36/// after any changes in the pinned events.
37#[instrument(
38    skip_all,
39    fields(
40        room_id = %timeline_controller.room().room_id(),
41    )
42)]
43pub(in crate::timeline) async fn pinned_events_task<S>(
44    pinned_event_ids_stream: S,
45    timeline_controller: TimelineController,
46) where
47    S: Stream<Item = Vec<OwnedEventId>>,
48{
49    pin_mut!(pinned_event_ids_stream);
50
51    while pinned_event_ids_stream.next().await.is_some() {
52        trace!("received a pinned events update");
53
54        match timeline_controller.reload_pinned_events().await {
55            Ok(Some(events)) => {
56                trace!("successfully reloaded pinned events");
57                timeline_controller
58                    .replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
59                    .await;
60            }
61
62            Ok(None) => {
63                // The list of pinned events hasn't changed since the previous
64                // time.
65            }
66
67            Err(err) => {
68                warn!("Failed to reload pinned events: {err}");
69            }
70        }
71    }
72}
73
74/// For a thread-focused timeline, a long-lived task that will listen to the
75/// underlying thread updates.
76pub(in crate::timeline) async fn thread_updates_task(
77    mut receiver: Receiver<ThreadEventCacheUpdate>,
78    room_event_cache: RoomEventCache,
79    timeline_controller: TimelineController,
80    root: OwnedEventId,
81) {
82    trace!("Spawned the thread event subscriber task.");
83
84    loop {
85        trace!("Waiting for an event.");
86
87        let update = match receiver.recv().await {
88            Ok(up) => up,
89            Err(RecvError::Closed) => break,
90            Err(RecvError::Lagged(num_skipped)) => {
91                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
92
93                // The updates might have lagged, but the room event cache might
94                // have events, so retrieve them and add them back again to the
95                // timeline, after clearing it.
96                let (initial_events, _) =
97                    match room_event_cache.subscribe_to_thread(root.clone()).await {
98                        Ok(values) => values,
99                        Err(err) => {
100                            error!(?err, "Subscribing to thread failed");
101                            break;
102                        }
103                    };
104
105                timeline_controller
106                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
107                    .await;
108
109                continue;
110            }
111        };
112
113        trace!("Received new timeline events diffs");
114
115        let origin = match update.origin {
116            EventsOrigin::Sync => RemoteEventOrigin::Sync,
117            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
118            EventsOrigin::Cache => RemoteEventOrigin::Cache,
119        };
120
121        let has_diffs = !update.diffs.is_empty();
122
123        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
124
125        if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
126            timeline_controller.retry_event_decryption(None).await;
127        }
128    }
129
130    trace!("Thread event subscriber task finished.");
131}
132
133/// Long-lived task that forwards the [`RoomEventCacheUpdate`]s (remote echoes)
134/// to the timeline.
135pub(in crate::timeline) async fn room_event_cache_updates_task(
136    room_event_cache: RoomEventCache,
137    timeline_controller: TimelineController,
138    mut room_event_cache_subscriber: RoomEventCacheSubscriber,
139    timeline_focus: TimelineFocus,
140) {
141    trace!("Spawned the event subscriber task.");
142
143    loop {
144        trace!("Waiting for an event.");
145
146        let update = match room_event_cache_subscriber.recv().await {
147            Ok(up) => up,
148            Err(RecvError::Closed) => break,
149            Err(RecvError::Lagged(num_skipped)) => {
150                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
151
152                // The updates might have lagged, but the room event cache might have
153                // events, so retrieve them and add them back again to the timeline,
154                // after clearing it.
155                let initial_events = match room_event_cache.events().await {
156                    Ok(initial_events) => initial_events,
157                    Err(err) => {
158                        error!(
159                            ?err,
160                            "Failed to replace the initial remote events in the event cache"
161                        );
162                        break;
163                    }
164                };
165
166                timeline_controller
167                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
168                    .await;
169
170                continue;
171            }
172        };
173
174        match update {
175            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
176                trace!(target = %event_id, "Handling fully read marker.");
177                timeline_controller.handle_fully_read_marker(event_id).await;
178            }
179
180            RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
181                trace!("Received new timeline events diffs");
182                let origin = match origin {
183                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
184                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
185                    EventsOrigin::Cache => RemoteEventOrigin::Cache,
186                };
187
188                let has_diffs = !diffs.is_empty();
189
190                if matches!(timeline_focus, TimelineFocus::Live { .. }) {
191                    timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
192                } else {
193                    // Only handle the remote aggregation for a non-live timeline.
194                    timeline_controller.handle_remote_aggregations(diffs, origin).await;
195                }
196
197                if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
198                    timeline_controller.retry_event_decryption(None).await;
199                }
200            }
201
202            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
203                trace!("Received new ephemeral events from sync.");
204
205                // TODO: (bnjbvr) ephemeral should be handled by the event cache.
206                timeline_controller.handle_ephemeral_events(events).await;
207            }
208
209            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
210                if !ambiguity_changes.is_empty() {
211                    let member_ambiguity_changes = ambiguity_changes
212                        .values()
213                        .flat_map(|change| change.user_ids())
214                        .collect::<BTreeSet<_>>();
215                    timeline_controller
216                        .force_update_sender_profiles(&member_ambiguity_changes)
217                        .await;
218                }
219            }
220        }
221    }
222}
223
224/// Long-lived task that forwards [`RoomSendQueueUpdate`]s (local echoes) to the
225/// timeline.
226pub(in crate::timeline) async fn room_send_queue_update_task(
227    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
228    timeline_controller: TimelineController,
229) {
230    trace!("spawned the local echo task!");
231
232    loop {
233        match send_queue_stream.recv().await {
234            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
235
236            Err(RecvError::Lagged(num_missed)) => {
237                warn!("missed {num_missed} local echoes, ignoring those missed");
238            }
239
240            Err(RecvError::Closed) => {
241                trace!("channel closed, exiting the local echo handler");
242                break;
243            }
244        }
245    }
246}