matrix_sdk_ui/timeline/
tasks.rs1use std::collections::BTreeSet;
18
19use futures_core::Stream;
20use futures_util::pin_mut;
21use matrix_sdk::{
22 event_cache::{
23 EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate,
24 ThreadEventCacheUpdate,
25 },
26 send_queue::RoomSendQueueUpdate,
27};
28use ruma::OwnedEventId;
29use tokio::sync::broadcast::{Receiver, error::RecvError};
30use tokio_stream::StreamExt as _;
31use tracing::{instrument, trace, warn};
32
33use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
34
35#[instrument(
38 skip_all,
39 fields(
40 room_id = %timeline_controller.room().room_id(),
41 )
42)]
43pub(in crate::timeline) async fn pinned_events_task<S>(
44 pinned_event_ids_stream: S,
45 timeline_controller: TimelineController,
46) where
47 S: Stream<Item = Vec<OwnedEventId>>,
48{
49 pin_mut!(pinned_event_ids_stream);
50
51 while pinned_event_ids_stream.next().await.is_some() {
52 trace!("received a pinned events update");
53
54 match timeline_controller.reload_pinned_events().await {
55 Ok(Some(events)) => {
56 trace!("successfully reloaded pinned events");
57 timeline_controller
58 .replace_with_initial_remote_events(
59 events.into_iter(),
60 RemoteEventOrigin::Pagination,
61 )
62 .await;
63 }
64
65 Ok(None) => {
66 }
69
70 Err(err) => {
71 warn!("Failed to reload pinned events: {err}");
72 }
73 }
74 }
75}
76
77pub(in crate::timeline) async fn thread_updates_task(
80 mut receiver: Receiver<ThreadEventCacheUpdate>,
81 room_event_cache: RoomEventCache,
82 timeline_controller: TimelineController,
83 root: OwnedEventId,
84) {
85 trace!("Spawned the thread event subscriber task.");
86
87 loop {
88 trace!("Waiting for an event.");
89
90 let update = match receiver.recv().await {
91 Ok(up) => up,
92 Err(RecvError::Closed) => break,
93 Err(RecvError::Lagged(num_skipped)) => {
94 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
95
96 let (initial_events, _) = room_event_cache.subscribe_to_thread(root.clone()).await;
100
101 timeline_controller
102 .replace_with_initial_remote_events(
103 initial_events.into_iter(),
104 RemoteEventOrigin::Cache,
105 )
106 .await;
107
108 continue;
109 }
110 };
111
112 trace!("Received new timeline events diffs");
113
114 let origin = match update.origin {
115 EventsOrigin::Sync => RemoteEventOrigin::Sync,
116 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
117 EventsOrigin::Cache => RemoteEventOrigin::Cache,
118 };
119
120 let has_diffs = !update.diffs.is_empty();
121
122 timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
123
124 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
125 timeline_controller.retry_event_decryption(None).await;
126 }
127 }
128
129 trace!("Thread event subscriber task finished.");
130}
131
132pub(in crate::timeline) async fn room_event_cache_updates_task(
135 room_event_cache: RoomEventCache,
136 timeline_controller: TimelineController,
137 mut room_event_cache_subscriber: RoomEventCacheSubscriber,
138 timeline_focus: TimelineFocus,
139) {
140 trace!("Spawned the event subscriber task.");
141
142 loop {
143 trace!("Waiting for an event.");
144
145 let update = match room_event_cache_subscriber.recv().await {
146 Ok(up) => up,
147 Err(RecvError::Closed) => break,
148 Err(RecvError::Lagged(num_skipped)) => {
149 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
150
151 let initial_events = room_event_cache.events().await;
155
156 timeline_controller
157 .replace_with_initial_remote_events(
158 initial_events.into_iter(),
159 RemoteEventOrigin::Cache,
160 )
161 .await;
162
163 continue;
164 }
165 };
166
167 match update {
168 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
169 trace!(target = %event_id, "Handling fully read marker.");
170 timeline_controller.handle_fully_read_marker(event_id).await;
171 }
172
173 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => {
174 trace!("Received new timeline events diffs");
175 let origin = match origin {
176 EventsOrigin::Sync => RemoteEventOrigin::Sync,
177 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
178 EventsOrigin::Cache => RemoteEventOrigin::Cache,
179 };
180
181 let has_diffs = !diffs.is_empty();
182
183 if matches!(timeline_focus, TimelineFocus::Live { .. }) {
184 timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
185 } else {
186 timeline_controller.handle_remote_aggregations(diffs, origin).await;
188 }
189
190 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
191 timeline_controller.retry_event_decryption(None).await;
192 }
193 }
194
195 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
196 trace!("Received new ephemeral events from sync.");
197
198 timeline_controller.handle_ephemeral_events(events).await;
200 }
201
202 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
203 if !ambiguity_changes.is_empty() {
204 let member_ambiguity_changes = ambiguity_changes
205 .values()
206 .flat_map(|change| change.user_ids())
207 .collect::<BTreeSet<_>>();
208 timeline_controller
209 .force_update_sender_profiles(&member_ambiguity_changes)
210 .await;
211 }
212 }
213 }
214 }
215}
216
217pub(in crate::timeline) async fn room_send_queue_update_task(
220 mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
221 timeline_controller: TimelineController,
222) {
223 trace!("spawned the local echo task!");
224
225 loop {
226 match send_queue_stream.recv().await {
227 Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
228
229 Err(RecvError::Lagged(num_missed)) => {
230 warn!("missed {num_missed} local echoes, ignoring those missed");
231 }
232
233 Err(RecvError::Closed) => {
234 trace!("channel closed, exiting the local echo handler");
235 break;
236 }
237 }
238 }
239}