matrix_sdk_ui/timeline/
tasks.rs1use std::collections::BTreeSet;
18
19use futures_core::Stream;
20use futures_util::pin_mut;
21use matrix_sdk::{
22 event_cache::{
23 EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
24 ThreadEventCacheUpdate,
25 },
26 send_queue::RoomSendQueueUpdate,
27};
28use ruma::OwnedEventId;
29use tokio::sync::broadcast::{Receiver, error::RecvError};
30use tokio_stream::StreamExt as _;
31use tracing::{error, instrument, trace, warn};
32
33use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
34
35#[instrument(
38 skip_all,
39 fields(
40 room_id = %timeline_controller.room().room_id(),
41 )
42)]
43pub(in crate::timeline) async fn pinned_events_task<S>(
44 pinned_event_ids_stream: S,
45 timeline_controller: TimelineController,
46) where
47 S: Stream<Item = Vec<OwnedEventId>>,
48{
49 pin_mut!(pinned_event_ids_stream);
50
51 while pinned_event_ids_stream.next().await.is_some() {
52 trace!("received a pinned events update");
53
54 match timeline_controller.reload_pinned_events().await {
55 Ok(Some(events)) => {
56 trace!("successfully reloaded pinned events");
57 timeline_controller
58 .replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
59 .await;
60 }
61
62 Ok(None) => {
63 }
66
67 Err(err) => {
68 warn!("Failed to reload pinned events: {err}");
69 }
70 }
71 }
72}
73
74pub(in crate::timeline) async fn thread_updates_task(
77 mut receiver: Receiver<ThreadEventCacheUpdate>,
78 room_event_cache: RoomEventCache,
79 timeline_controller: TimelineController,
80 root: OwnedEventId,
81) {
82 trace!("Spawned the thread event subscriber task.");
83
84 loop {
85 trace!("Waiting for an event.");
86
87 let update = match receiver.recv().await {
88 Ok(up) => up,
89 Err(RecvError::Closed) => break,
90 Err(RecvError::Lagged(num_skipped)) => {
91 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
92
93 let (initial_events, _) =
97 match room_event_cache.subscribe_to_thread(root.clone()).await {
98 Ok(values) => values,
99 Err(err) => {
100 error!(?err, "Subscribing to thread failed");
101 break;
102 }
103 };
104
105 timeline_controller
106 .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
107 .await;
108
109 continue;
110 }
111 };
112
113 trace!("Received new timeline events diffs");
114
115 let origin = match update.origin {
116 EventsOrigin::Sync => RemoteEventOrigin::Sync,
117 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
118 EventsOrigin::Cache => RemoteEventOrigin::Cache,
119 };
120
121 let has_diffs = !update.diffs.is_empty();
122
123 timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
124
125 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
126 timeline_controller.retry_event_decryption(None).await;
127 }
128 }
129
130 trace!("Thread event subscriber task finished.");
131}
132
133pub(in crate::timeline) async fn room_event_cache_updates_task(
136 room_event_cache: RoomEventCache,
137 timeline_controller: TimelineController,
138 mut room_event_cache_subscriber: RoomEventCacheSubscriber,
139 timeline_focus: TimelineFocus,
140) {
141 trace!("Spawned the event subscriber task.");
142
143 loop {
144 trace!("Waiting for an event.");
145
146 let update = match room_event_cache_subscriber.recv().await {
147 Ok(up) => up,
148 Err(RecvError::Closed) => break,
149 Err(RecvError::Lagged(num_skipped)) => {
150 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
151
152 let initial_events = match room_event_cache.events().await {
156 Ok(initial_events) => initial_events,
157 Err(err) => {
158 error!(
159 ?err,
160 "Failed to replace the initial remote events in the event cache"
161 );
162 break;
163 }
164 };
165
166 timeline_controller
167 .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
168 .await;
169
170 continue;
171 }
172 };
173
174 match update {
175 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
176 trace!(target = %event_id, "Handling fully read marker.");
177 timeline_controller.handle_fully_read_marker(event_id).await;
178 }
179
180 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
181 trace!("Received new timeline events diffs");
182 let origin = match origin {
183 EventsOrigin::Sync => RemoteEventOrigin::Sync,
184 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
185 EventsOrigin::Cache => RemoteEventOrigin::Cache,
186 };
187
188 let has_diffs = !diffs.is_empty();
189
190 if matches!(timeline_focus, TimelineFocus::Live { .. }) {
191 timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
192 } else {
193 timeline_controller.handle_remote_aggregations(diffs, origin).await;
195 }
196
197 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
198 timeline_controller.retry_event_decryption(None).await;
199 }
200 }
201
202 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
203 trace!("Received new ephemeral events from sync.");
204
205 timeline_controller.handle_ephemeral_events(events).await;
207 }
208
209 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
210 if !ambiguity_changes.is_empty() {
211 let member_ambiguity_changes = ambiguity_changes
212 .values()
213 .flat_map(|change| change.user_ids())
214 .collect::<BTreeSet<_>>();
215 timeline_controller
216 .force_update_sender_profiles(&member_ambiguity_changes)
217 .await;
218 }
219 }
220 }
221 }
222}
223
224pub(in crate::timeline) async fn room_send_queue_update_task(
227 mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
228 timeline_controller: TimelineController,
229) {
230 trace!("spawned the local echo task!");
231
232 loop {
233 match send_queue_stream.recv().await {
234 Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
235
236 Err(RecvError::Lagged(num_missed)) => {
237 warn!("missed {num_missed} local echoes, ignoring those missed");
238 }
239
240 Err(RecvError::Closed) => {
241 trace!("channel closed, exiting the local echo handler");
242 break;
243 }
244 }
245 }
246}