1use std::collections::BTreeSet;
18
19use matrix_sdk::{
20 event_cache::{
21 EventFocusThreadMode, EventsOrigin, RoomEventCache, RoomEventCacheSubscriber,
22 RoomEventCacheUpdate, TimelineVectorDiffs,
23 },
24 send_queue::RoomSendQueueUpdate,
25};
26use ruma::OwnedEventId;
27use tokio::sync::broadcast::{Receiver, error::RecvError};
28use tracing::{error, instrument, trace, warn};
29
30use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
31
32#[instrument(
35 skip_all,
36 fields(
37 room_id = %timeline_controller.room().room_id(),
38 )
39)]
40pub(in crate::timeline) async fn pinned_events_task(
41 room_event_cache: RoomEventCache,
42 timeline_controller: TimelineController,
43 mut pinned_events_recv: Receiver<TimelineVectorDiffs>,
44) {
45 loop {
46 trace!("Waiting for an event.");
47
48 let update = match pinned_events_recv.recv().await {
49 Ok(up) => up,
50 Err(RecvError::Closed) => break,
51 Err(RecvError::Lagged(num_skipped)) => {
52 warn!(num_skipped, "Lagged behind pinned-event cache updates, resetting timeline");
53
54 let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await
58 {
59 Ok(initial_events) => initial_events,
60 Err(err) => {
61 error!(
62 ?err,
63 "Failed to replace the initial remote events in the event cache"
64 );
65 break;
66 }
67 };
68
69 timeline_controller
70 .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
71 .await;
72
73 continue;
74 }
75 };
76
77 trace!("Received new timeline events diffs");
78 let origin = match update.origin {
79 EventsOrigin::Sync => RemoteEventOrigin::Sync,
80 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
81 EventsOrigin::Cache => RemoteEventOrigin::Cache,
82 };
83 timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
84 }
85}
86
87#[instrument(
90 skip_all,
91 fields(
92 room_id = %timeline_controller.room().room_id(),
93 focused_event_id = %focused_event,
94 ?thread_mode
95 )
96)]
97pub(in crate::timeline) async fn event_focused_task(
98 focused_event: OwnedEventId,
99 thread_mode: EventFocusThreadMode,
100 room_event_cache: RoomEventCache,
101 timeline_controller: TimelineController,
102 mut event_focused_events_recv: Receiver<TimelineVectorDiffs>,
103) {
104 loop {
105 trace!("Waiting for an event.");
106
107 let update = match event_focused_events_recv.recv().await {
108 Ok(up) => up,
109 Err(RecvError::Closed) => break,
110 Err(RecvError::Lagged(num_skipped)) => {
111 warn!(num_skipped, "Lagged behind focused-event cache updates, resetting timeline");
112
113 let cache = match room_event_cache
117 .get_event_focused_cache(focused_event.clone(), thread_mode)
118 .await
119 {
120 Ok(Some(cache)) => cache,
121 Ok(None) => {
122 error!("Focused event timeline doesn't have an attached cache");
123 break;
124 }
125 Err(err) => {
126 error!(%err, "Failed to get the focused cache for the focused event");
127 break;
128 }
129 };
130
131 let (initial_events, _) = cache.subscribe().await;
132
133 timeline_controller
134 .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
135 .await;
136
137 continue;
138 }
139 };
140
141 trace!("Received new timeline events diffs");
142 let origin = match update.origin {
143 EventsOrigin::Sync => RemoteEventOrigin::Sync,
144 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
145 EventsOrigin::Cache => RemoteEventOrigin::Cache,
146 };
147 timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
148 }
149}
150
151pub(in crate::timeline) async fn thread_updates_task(
154 mut receiver: Receiver<TimelineVectorDiffs>,
155 room_event_cache: RoomEventCache,
156 timeline_controller: TimelineController,
157 root: OwnedEventId,
158) {
159 trace!("Spawned the thread event subscriber task.");
160
161 loop {
162 trace!("Waiting for an event.");
163
164 let update = match receiver.recv().await {
165 Ok(up) => up,
166 Err(RecvError::Closed) => break,
167 Err(RecvError::Lagged(num_skipped)) => {
168 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
169
170 _ = timeline_controller.init_with_thread_root(&root, &room_event_cache).await;
174
175 continue;
176 }
177 };
178
179 trace!("Received new timeline events diffs");
180
181 let origin = match update.origin {
182 EventsOrigin::Sync => RemoteEventOrigin::Sync,
183 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
184 EventsOrigin::Cache => RemoteEventOrigin::Cache,
185 };
186
187 let has_diffs = !update.diffs.is_empty();
188
189 timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
190
191 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
192 timeline_controller.retry_event_decryption(None).await;
193 }
194 }
195
196 trace!("Thread event subscriber task finished.");
197}
198
199pub(in crate::timeline) async fn room_event_cache_updates_task(
202 room_event_cache: RoomEventCache,
203 timeline_controller: TimelineController,
204 mut room_event_cache_subscriber: RoomEventCacheSubscriber,
205 timeline_focus: TimelineFocus,
206) {
207 trace!("Spawned the event subscriber task.");
208
209 loop {
210 trace!("Waiting for an event.");
211
212 let update = match room_event_cache_subscriber.recv().await {
213 Ok(up) => up,
214 Err(RecvError::Closed) => break,
215 Err(RecvError::Lagged(num_skipped)) => {
216 warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
217
218 let initial_events = match room_event_cache.events().await {
222 Ok(initial_events) => initial_events,
223 Err(err) => {
224 error!(
225 ?err,
226 "Failed to replace the initial remote events in the event cache"
227 );
228 break;
229 }
230 };
231
232 timeline_controller
233 .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
234 .await;
235
236 continue;
237 }
238 };
239
240 match update {
241 RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
242 trace!(target = %event_id, "Handling fully read marker.");
243 timeline_controller.handle_fully_read_marker(event_id).await;
244 }
245
246 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, origin }) => {
247 trace!("Received new timeline events diffs");
248 let origin = match origin {
249 EventsOrigin::Sync => RemoteEventOrigin::Sync,
250 EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
251 EventsOrigin::Cache => RemoteEventOrigin::Cache,
252 };
253
254 let has_diffs = !diffs.is_empty();
255
256 if matches!(timeline_focus, TimelineFocus::Live { .. }) {
257 timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
258 } else if !matches!(timeline_focus, TimelineFocus::PinnedEvents) {
259 timeline_controller.handle_remote_aggregations(diffs, origin).await;
262 }
263
264 if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
265 timeline_controller.retry_event_decryption(None).await;
266 }
267 }
268
269 RoomEventCacheUpdate::AddEphemeralEvents { events } => {
270 trace!("Received new ephemeral events from sync.");
271
272 timeline_controller.handle_ephemeral_events(events).await;
274 }
275
276 RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
277 if !ambiguity_changes.is_empty() {
278 let member_ambiguity_changes = ambiguity_changes
279 .values()
280 .flat_map(|change| change.user_ids())
281 .collect::<BTreeSet<_>>();
282 timeline_controller
283 .force_update_sender_profiles(&member_ambiguity_changes)
284 .await;
285 }
286 }
287 }
288 }
289}
290
291pub(in crate::timeline) async fn room_send_queue_update_task(
294 mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
295 timeline_controller: TimelineController,
296) {
297 trace!("spawned the local echo task!");
298
299 loop {
300 match send_queue_stream.recv().await {
301 Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
302
303 Err(RecvError::Lagged(num_missed)) => {
304 warn!("missed {num_missed} local echoes, ignoring those missed");
305 }
306
307 Err(RecvError::Closed) => {
308 trace!("channel closed, exiting the local echo handler");
309 break;
310 }
311 }
312 }
313}