matrix_sdk_ui/timeline/
tasks.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Long-lived tasks for the timeline.
16
17use std::collections::BTreeSet;
18
19use futures_core::Stream;
20use futures_util::pin_mut;
21use matrix_sdk::{
22    event_cache::{
23        EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
24        ThreadEventCacheUpdate,
25    },
26    send_queue::RoomSendQueueUpdate,
27};
28use ruma::OwnedEventId;
29use tokio::sync::broadcast::{Receiver, error::RecvError};
30use tokio_stream::StreamExt as _;
31use tracing::{instrument, trace, warn};
32
33use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
34
35/// Long-lived task, in the pinned events focus mode, that updates the timeline
36/// after any changes in the pinned events.
37#[instrument(
38    skip_all,
39    fields(
40        room_id = %timeline_controller.room().room_id(),
41    )
42)]
43pub(in crate::timeline) async fn pinned_events_task<S>(
44    pinned_event_ids_stream: S,
45    timeline_controller: TimelineController,
46) where
47    S: Stream<Item = Vec<OwnedEventId>>,
48{
49    pin_mut!(pinned_event_ids_stream);
50
51    while pinned_event_ids_stream.next().await.is_some() {
52        trace!("received a pinned events update");
53
54        match timeline_controller.reload_pinned_events().await {
55            Ok(Some(events)) => {
56                trace!("successfully reloaded pinned events");
57                timeline_controller
58                    .replace_with_initial_remote_events(
59                        events.into_iter(),
60                        RemoteEventOrigin::Pagination,
61                    )
62                    .await;
63            }
64
65            Ok(None) => {
66                // The list of pinned events hasn't changed since the previous
67                // time.
68            }
69
70            Err(err) => {
71                warn!("Failed to reload pinned events: {err}");
72            }
73        }
74    }
75}
76
77/// For a thread-focused timeline, a long-lived task that will listen to the
78/// underlying thread updates.
79pub(in crate::timeline) async fn thread_updates_task(
80    mut receiver: Receiver<ThreadEventCacheUpdate>,
81    room_event_cache: RoomEventCache,
82    timeline_controller: TimelineController,
83    root: OwnedEventId,
84) {
85    trace!("Spawned the thread event subscriber task.");
86
87    loop {
88        trace!("Waiting for an event.");
89
90        let update = match receiver.recv().await {
91            Ok(up) => up,
92            Err(RecvError::Closed) => break,
93            Err(RecvError::Lagged(num_skipped)) => {
94                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
95
96                // The updates might have lagged, but the room event cache might
97                // have events, so retrieve them and add them back again to the
98                // timeline, after clearing it.
99                let (initial_events, _) = room_event_cache.subscribe_to_thread(root.clone()).await;
100
101                timeline_controller
102                    .replace_with_initial_remote_events(
103                        initial_events.into_iter(),
104                        RemoteEventOrigin::Cache,
105                    )
106                    .await;
107
108                continue;
109            }
110        };
111
112        trace!("Received new timeline events diffs");
113
114        let origin = match update.origin {
115            EventsOrigin::Sync => RemoteEventOrigin::Sync,
116            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
117            EventsOrigin::Cache => RemoteEventOrigin::Cache,
118        };
119
120        let has_diffs = !update.diffs.is_empty();
121
122        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
123
124        if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
125            timeline_controller.retry_event_decryption(None).await;
126        }
127    }
128
129    trace!("Thread event subscriber task finished.");
130}
131
132/// Long-lived task that forwards the [`RoomEventCacheUpdate`]s (remote echoes)
133/// to the timeline.
134pub(in crate::timeline) async fn room_event_cache_updates_task(
135    room_event_cache: RoomEventCache,
136    timeline_controller: TimelineController,
137    mut room_event_cache_subscriber: RoomEventCacheSubscriber,
138    timeline_focus: TimelineFocus,
139) {
140    trace!("Spawned the event subscriber task.");
141
142    loop {
143        trace!("Waiting for an event.");
144
145        let update = match room_event_cache_subscriber.recv().await {
146            Ok(up) => up,
147            Err(RecvError::Closed) => break,
148            Err(RecvError::Lagged(num_skipped)) => {
149                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
150
151                // The updates might have lagged, but the room event cache might have
152                // events, so retrieve them and add them back again to the timeline,
153                // after clearing it.
154                let initial_events = room_event_cache.events().await;
155
156                timeline_controller
157                    .replace_with_initial_remote_events(
158                        initial_events.into_iter(),
159                        RemoteEventOrigin::Cache,
160                    )
161                    .await;
162
163                continue;
164            }
165        };
166
167        match update {
168            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
169                trace!(target = %event_id, "Handling fully read marker.");
170                timeline_controller.handle_fully_read_marker(event_id).await;
171            }
172
173            RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
174                trace!("Received new timeline events diffs");
175                let origin = match origin {
176                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
177                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
178                    EventsOrigin::Cache => RemoteEventOrigin::Cache,
179                };
180
181                let has_diffs = !diffs.is_empty();
182
183                if matches!(timeline_focus, TimelineFocus::Live { .. }) {
184                    timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
185                } else {
186                    // Only handle the remote aggregation for a non-live timeline.
187                    timeline_controller.handle_remote_aggregations(diffs, origin).await;
188                }
189
190                if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
191                    timeline_controller.retry_event_decryption(None).await;
192                }
193            }
194
195            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
196                trace!("Received new ephemeral events from sync.");
197
198                // TODO: (bnjbvr) ephemeral should be handled by the event cache.
199                timeline_controller.handle_ephemeral_events(events).await;
200            }
201
202            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
203                if !ambiguity_changes.is_empty() {
204                    let member_ambiguity_changes = ambiguity_changes
205                        .values()
206                        .flat_map(|change| change.user_ids())
207                        .collect::<BTreeSet<_>>();
208                    timeline_controller
209                        .force_update_sender_profiles(&member_ambiguity_changes)
210                        .await;
211                }
212            }
213        }
214    }
215}
216
217/// Long-lived task that forwards [`RoomSendQueueUpdate`]s (local echoes) to the
218/// timeline.
219pub(in crate::timeline) async fn room_send_queue_update_task(
220    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
221    timeline_controller: TimelineController,
222) {
223    trace!("spawned the local echo task!");
224
225    loop {
226        match send_queue_stream.recv().await {
227            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
228
229            Err(RecvError::Lagged(num_missed)) => {
230                warn!("missed {num_missed} local echoes, ignoring those missed");
231            }
232
233            Err(RecvError::Closed) => {
234                trace!("channel closed, exiting the local echo handler");
235                break;
236            }
237        }
238    }
239}