1use std::{
16 collections::HashMap,
17 sync::{Arc, Weak},
18};
19
20use eyeball::Subscriber;
21use matrix_sdk_base::{
22 linked_chunk::OwnedLinkedChunkId, serde_helpers::extract_thread_root_from_content,
23 sync::RoomUpdates,
24};
25use ruma::{OwnedEventId, OwnedTransactionId};
26use tokio::{
27 select,
28 sync::{
29 broadcast::{Receiver, Sender, error::RecvError},
30 mpsc,
31 },
32};
33use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
34
35use super::{
36 AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin,
37 RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs,
38};
39use crate::{
40 client::WeakClient,
41 send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
42};
43
44#[instrument(skip_all)]
46pub(super) async fn room_updates_task(
47 inner: Arc<EventCacheInner>,
48 mut room_updates_feed: Receiver<RoomUpdates>,
49) {
50 trace!("Spawning the listen task");
51 loop {
52 match room_updates_feed.recv().await {
53 Ok(updates) => {
54 trace!("Receiving `RoomUpdates`");
55
56 if let Err(err) = inner.handle_room_updates(updates).await {
57 match err {
58 EventCacheError::ClientDropped => {
59 info!(
61 "Closing the event cache global listen task because client dropped"
62 );
63 break;
64 }
65 err => {
66 error!("Error when handling room updates: {err}");
67 }
68 }
69 }
70 }
71
72 Err(RecvError::Lagged(num_skipped)) => {
73 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
77 if let Err(err) = inner.clear_all_rooms().await {
78 error!("when clearing storage after lag in listen_task: {err}");
79 }
80 }
81
82 Err(RecvError::Closed) => {
83 info!("Closing the event cache global listen task because receiver closed");
85 break;
86 }
87 }
88 }
89}
90
91#[instrument(skip_all)]
94pub(super) async fn ignore_user_list_update_task(
95 inner: Arc<EventCacheInner>,
96 mut ignore_user_list_stream: Subscriber<Vec<String>>,
97) {
98 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
99 span.follows_from(Span::current());
100
101 async move {
102 while ignore_user_list_stream.next().await.is_some() {
103 info!("Received an ignore user list change");
104
105 if let Err(err) = inner.clear_all_rooms().await {
106 error!("when clearing room storage after ignore user list change: {err}");
107 }
108 }
109
110 info!("Ignore user list stream has closed");
111 }
112 .instrument(span)
113 .await;
114}
115
116#[instrument(skip_all)]
133pub(super) async fn auto_shrink_linked_chunk_task(
134 inner: Weak<EventCacheInner>,
135 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
136) {
137 while let Some(room_id) = rx.recv().await {
138 trace!(for_room = %room_id, "received notification to shrink");
139
140 let Some(inner) = inner.upgrade() else {
141 return;
142 };
143
144 let room = {
145 let caches = match inner.all_caches_for_room(&room_id).await {
146 Ok(caches) => caches,
147 Err(err) => {
148 warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
149 continue;
150 }
151 };
152
153 caches.room.clone()
154 };
155
156 trace!("Waiting for state lock…");
157
158 let mut state = match room.state().write().await {
159 Ok(state) => state,
160 Err(err) => {
161 warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
162 continue;
163 }
164 };
165
166 match state.auto_shrink_if_no_subscribers().await {
167 Ok(diffs) => {
168 if let Some(diffs) = diffs {
169 if !diffs.is_empty() {
176 room.update_sender().send(
177 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
178 diffs,
179 origin: EventsOrigin::Cache,
180 }),
181 None,
182 );
183 }
184 } else {
185 debug!("auto-shrinking didn't happen");
186 }
187 }
188
189 Err(err) => {
190 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
192 }
193 }
194 }
195
196 info!("Auto-shrink linked chunk task has been closed, exiting");
197}
198
199#[instrument(skip_all)]
202pub(super) async fn thread_subscriber_task(
203 client: WeakClient,
204 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
205 thread_subscriber_sender: Sender<()>,
206) {
207 let mut send_q_rx = if let Some(client) = client.get() {
208 match client.enabled_thread_subscriptions().await {
209 Ok(enabled) => {
210 if !enabled {
211 trace!(
212 "Thread subscriptions are not enabled, not spawning thread subscriber task"
213 );
214 return;
215 }
216 }
217
218 Err(err) => {
219 warn!(%err, "Failed to get whether thread subscriptions are enabled, not spawning thread subscriber task");
220 return;
221 }
222 }
223
224 client.send_queue().subscribe()
225 } else {
226 trace!("Client is shutting down, not spawning thread subscriber task");
227 return;
228 };
229
230 let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
231
232 let mut events_being_sent = HashMap::new();
237
238 loop {
239 select! {
240 res = send_q_rx.recv() => {
241 match res {
242 Ok(up) => {
243 if !handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
244 break;
245 }
246 }
247 Err(RecvError::Closed) => {
248 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
249 break;
250 }
251 Err(RecvError::Lagged(num_skipped)) => {
252 warn!(num_skipped, "Lagged behind linked chunk updates");
253 }
254 }
255 }
256
257 res = linked_chunk_rx.recv() => {
258 match res {
259 Ok(up) => {
260 if !handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
261 break;
262 }
263 }
264 Err(RecvError::Closed) => {
265 debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
266 break;
267 }
268 Err(RecvError::Lagged(num_skipped)) => {
269 warn!(num_skipped, "Lagged behind linked chunk updates");
270 }
271 }
272 }
273 }
274 }
275}
276
277#[instrument(skip(client, thread_subscriber_sender))]
284async fn handle_thread_subscriber_send_queue_update(
285 client: &WeakClient,
286 thread_subscriber_sender: &Sender<()>,
287 events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
288 up: SendQueueUpdate,
289) -> bool {
290 let Some(client) = client.get() else {
291 debug!("Client is shutting down, exiting thread subscriber task");
293 return false;
294 };
295
296 let room_id = up.room_id;
297 let Some(room) = client.get_room(&room_id) else {
298 warn!(%room_id, "unknown room");
299 return true;
300 };
301
302 let (thread_root, subscribe_up_to) = match up.update {
303 RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
304 match local_echo.content {
305 LocalEchoContent::Event { serialized_event, .. } => {
306 if let Some(thread_root) =
307 extract_thread_root_from_content(serialized_event.into_raw().0)
308 {
309 events_being_sent.insert(local_echo.transaction_id, thread_root);
310 }
311 }
312 LocalEchoContent::React { .. } => {
313 }
316
317 LocalEchoContent::Redaction { .. } => {
318 }
321 }
322 return true;
323 }
324
325 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
326 events_being_sent.remove(&transaction_id);
327 return true;
328 }
329
330 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
331 if let Some(thread_root) = extract_thread_root_from_content(new_content.into_raw().0) {
332 events_being_sent.insert(transaction_id, thread_root);
333 } else {
334 events_being_sent.remove(&transaction_id);
337 }
338 return true;
339 }
340
341 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
342 if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
343 (thread_root, event_id)
344 } else {
345 trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
347 return true;
348 }
349 }
350
351 RoomSendQueueUpdate::SendError { .. }
352 | RoomSendQueueUpdate::RetryEvent { .. }
353 | RoomSendQueueUpdate::MediaUpload { .. } => {
354 return true;
356 }
357 };
358
359 trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
361
362 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await {
363 warn!(%err, "Failed to subscribe to thread");
364 } else {
365 let _ = thread_subscriber_sender.send(());
366 }
367
368 true
369}
370
371#[instrument(skip(client, thread_subscriber_sender))]
378async fn handle_thread_subscriber_linked_chunk_update(
379 client: &WeakClient,
380 thread_subscriber_sender: &Sender<()>,
381 up: RoomEventCacheLinkedChunkUpdate,
382) -> bool {
383 let Some(client) = client.get() else {
384 debug!("Client is shutting down, exiting thread subscriber task");
386 return false;
387 };
388
389 let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
390 trace!("received an update for a non-thread linked chunk, ignoring");
391 return true;
392 };
393
394 let Some(room) = client.get_room(room_id) else {
395 warn!(%room_id, "unknown room");
396 return true;
397 };
398
399 let thread_root = thread_root.clone();
400
401 let mut new_events = up.events().peekable();
402
403 if new_events.peek().is_none() {
404 return true;
406 }
407
408 let with_thread_subscriptions = false;
417
418 let Some(push_context) = room
419 .push_context_internal(with_thread_subscriptions)
420 .await
421 .inspect_err(|err| {
422 warn!("Failed to get push context for threads: {err}");
423 })
424 .ok()
425 .flatten()
426 else {
427 warn!("Missing push context for thread subscriptions.");
428 return true;
429 };
430
431 let mut subscribe_up_to = None;
432
433 for ev in new_events.rev() {
437 if push_context.for_event(ev.raw()).await.into_iter().any(|action| action.should_notify()) {
438 let Some(event_id) = ev.event_id() else {
439 continue;
441 };
442 subscribe_up_to = Some(event_id);
443 break;
444 }
445 }
446
447 if let Some(event_id) = subscribe_up_to {
450 trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
451 if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
452 warn!(%err, "Failed to subscribe to thread");
453 } else {
454 let _ = thread_subscriber_sender.send(());
455 }
456 }
457
458 true
459}
460
461#[cfg(feature = "experimental-search")]
468#[instrument(skip_all)]
469pub(super) async fn search_indexing_task(
470 client: WeakClient,
471 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
472) {
473 let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
474
475 loop {
476 match linked_chunk_update_receiver.recv().await {
477 Ok(room_ec_lc_update) => {
478 let OwnedLinkedChunkId::Room(room_id) = room_ec_lc_update.linked_chunk_id.clone()
479 else {
480 trace!("Received non-room updates, ignoring.");
481 continue;
482 };
483
484 let mut timeline_events = room_ec_lc_update.events().peekable();
485
486 if timeline_events.peek().is_none() {
487 continue;
488 }
489
490 let Some(client) = client.get() else {
491 trace!("Client is shutting down, exiting search task");
492 return;
493 };
494
495 let maybe_room_cache = client.event_cache().for_room(&room_id).await;
496 let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
497 warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
498 continue;
499 };
500
501 let maybe_room = client.get_room(&room_id);
502 let Some(room) = maybe_room else {
503 warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
504 continue;
505 };
506 let redaction_rules = room.clone_info().room_version_rules_or_default().redaction;
507
508 let mut search_index_guard = client.search_index().lock().await;
509
510 if let Err(err) = search_index_guard
511 .bulk_handle_timeline_event(
512 timeline_events,
513 &room_cache,
514 &room_id,
515 &redaction_rules,
516 )
517 .await
518 {
519 error!("Failed to handle events for indexing: {err}")
520 }
521 }
522 Err(RecvError::Closed) => {
523 debug!(
524 "Linked chunk update channel has been closed, exiting thread subscriber task"
525 );
526 break;
527 }
528 Err(RecvError::Lagged(num_skipped)) => {
529 warn!(num_skipped, "Lagged behind linked chunk updates");
530 }
531 }
532 }
533}