Skip to main content

matrix_sdk/event_cache/
tasks.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, Weak},
18};
19
20use eyeball::Subscriber;
21use matrix_sdk_base::{
22    linked_chunk::OwnedLinkedChunkId, serde_helpers::extract_thread_root_from_content,
23    sync::RoomUpdates,
24};
25use ruma::{OwnedEventId, OwnedTransactionId};
26use tokio::{
27    select,
28    sync::{
29        broadcast::{Receiver, Sender, error::RecvError},
30        mpsc,
31    },
32};
33use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
34
35use super::{
36    AutoShrinkChannelPayload, EventCacheError, EventCacheInner, EventsOrigin,
37    RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, TimelineVectorDiffs,
38};
39use crate::{
40    client::WeakClient,
41    send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
42};
43
44/// Listen to [`RoomUpdates`] to update the Event Cache.
45#[instrument(skip_all)]
46pub(super) async fn room_updates_task(
47    inner: Arc<EventCacheInner>,
48    mut room_updates_feed: Receiver<RoomUpdates>,
49) {
50    trace!("Spawning the listen task");
51    loop {
52        match room_updates_feed.recv().await {
53            Ok(updates) => {
54                trace!("Receiving `RoomUpdates`");
55
56                if let Err(err) = inner.handle_room_updates(updates).await {
57                    match err {
58                        EventCacheError::ClientDropped => {
59                            // The client has dropped, exit the listen task.
60                            info!(
61                                "Closing the event cache global listen task because client dropped"
62                            );
63                            break;
64                        }
65                        err => {
66                            error!("Error when handling room updates: {err}");
67                        }
68                    }
69                }
70            }
71
72            Err(RecvError::Lagged(num_skipped)) => {
73                // Forget everything we know; we could have missed events, and we have
74                // no way to reconcile at the moment!
75                // TODO: implement Smart Matching™,
76                warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
77                if let Err(err) = inner.clear_all_rooms().await {
78                    error!("when clearing storage after lag in listen_task: {err}");
79                }
80            }
81
82            Err(RecvError::Closed) => {
83                // The sender has shut down, exit.
84                info!("Closing the event cache global listen task because receiver closed");
85                break;
86            }
87        }
88    }
89}
90
91/// Listen to _ignore user list update changes_ to clear the rooms when a user
92/// is ignored or unignored.
93#[instrument(skip_all)]
94pub(super) async fn ignore_user_list_update_task(
95    inner: Arc<EventCacheInner>,
96    mut ignore_user_list_stream: Subscriber<Vec<String>>,
97) {
98    let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
99    span.follows_from(Span::current());
100
101    async move {
102        while ignore_user_list_stream.next().await.is_some() {
103            info!("Received an ignore user list change");
104
105            if let Err(err) = inner.clear_all_rooms().await {
106                error!("when clearing room storage after ignore user list change: {err}");
107            }
108        }
109
110        info!("Ignore user list stream has closed");
111    }
112    .instrument(span)
113    .await;
114}
115
116/// Spawns the task that will listen to auto-shrink notifications.
117///
118/// The auto-shrink mechanism works this way:
119///
120/// - Each time there's a new subscriber to a [`RoomEventCache`], it will
121///   increment the active number of subscribers to that room, aka
122///   `RoomEventCacheState::subscriber_count`.
123/// - When that subscriber is dropped, it will decrement that count; and notify
124///   the task below if it reached 0.
125/// - The task spawned here, owned by the [`EventCacheInner`], will listen to
126///   such notifications that a room may be shrunk. It will attempt an
127///   auto-shrink, by letting the inner state decide whether this is a good time
128///   to do so (new subscribers might have spawned in the meanwhile).
129///
130/// [`RoomEventCache`]: super::RoomEventCache
131/// [`EventCacheInner`]: super::EventCacheInner
132#[instrument(skip_all)]
133pub(super) async fn auto_shrink_linked_chunk_task(
134    inner: Weak<EventCacheInner>,
135    mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
136) {
137    while let Some(room_id) = rx.recv().await {
138        trace!(for_room = %room_id, "received notification to shrink");
139
140        let Some(inner) = inner.upgrade() else {
141            return;
142        };
143
144        let room = {
145            let caches = match inner.all_caches_for_room(&room_id).await {
146                Ok(caches) => caches,
147                Err(err) => {
148                    warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
149                    continue;
150                }
151            };
152
153            caches.room.clone()
154        };
155
156        trace!("Waiting for state lock…");
157
158        let mut state = match room.state().write().await {
159            Ok(state) => state,
160            Err(err) => {
161                warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}");
162                continue;
163            }
164        };
165
166        match state.auto_shrink_if_no_subscribers().await {
167            Ok(diffs) => {
168                if let Some(diffs) = diffs {
169                    // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
170                    // subscribers, right? RIGHT? Especially because the state is guarded behind
171                    // a lock.
172                    //
173                    // However, better safe than sorry, and it's cheap to send an update here,
174                    // so let's do it!
175                    if !diffs.is_empty() {
176                        room.update_sender().send(
177                            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
178                                diffs,
179                                origin: EventsOrigin::Cache,
180                            }),
181                            None,
182                        );
183                    }
184                } else {
185                    debug!("auto-shrinking didn't happen");
186                }
187            }
188
189            Err(err) => {
190                // There's not much we can do here, unfortunately.
191                warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
192            }
193        }
194    }
195
196    info!("Auto-shrink linked chunk task has been closed, exiting");
197}
198
199/// Handle [`SendQueueUpdate`] and [`RoomEventCacheLinkedChunkUpdate`] to update
200/// the threads, for a thread the user was not subscribed to.
201#[instrument(skip_all)]
202pub(super) async fn thread_subscriber_task(
203    client: WeakClient,
204    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
205    thread_subscriber_sender: Sender<()>,
206) {
207    let mut send_q_rx = if let Some(client) = client.get() {
208        match client.enabled_thread_subscriptions().await {
209            Ok(enabled) => {
210                if !enabled {
211                    trace!(
212                        "Thread subscriptions are not enabled, not spawning thread subscriber task"
213                    );
214                    return;
215                }
216            }
217
218            Err(err) => {
219                warn!(%err, "Failed to get whether thread subscriptions are enabled, not spawning thread subscriber task");
220                return;
221            }
222        }
223
224        client.send_queue().subscribe()
225    } else {
226        trace!("Client is shutting down, not spawning thread subscriber task");
227        return;
228    };
229
230    let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
231
232    // A mapping of local echoes (events being sent), to their thread root, if
233    // they're in an in-thread reply.
234    //
235    // Entirely managed by `handle_thread_subscriber_send_queue_update`.
236    let mut events_being_sent = HashMap::new();
237
238    loop {
239        select! {
240            res = send_q_rx.recv() => {
241                match res {
242                    Ok(up) => {
243                        if !handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
244                            break;
245                        }
246                    }
247                    Err(RecvError::Closed) => {
248                        debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
249                        break;
250                    }
251                    Err(RecvError::Lagged(num_skipped)) => {
252                        warn!(num_skipped, "Lagged behind linked chunk updates");
253                    }
254                }
255            }
256
257            res = linked_chunk_rx.recv() => {
258                match res {
259                    Ok(up) => {
260                        if !handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
261                            break;
262                        }
263                    }
264                    Err(RecvError::Closed) => {
265                        debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
266                        break;
267                    }
268                    Err(RecvError::Lagged(num_skipped)) => {
269                        warn!(num_skipped, "Lagged behind linked chunk updates");
270                    }
271                }
272            }
273        }
274    }
275}
276
277/// React to a given send queue update by subscribing the user to a
278/// thread, if needs be (when the user sent an event in a thread they were
279/// not subscribed to).
280///
281/// Returns a boolean indicating whether the task should keep on running or
282/// not.
283#[instrument(skip(client, thread_subscriber_sender))]
284async fn handle_thread_subscriber_send_queue_update(
285    client: &WeakClient,
286    thread_subscriber_sender: &Sender<()>,
287    events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
288    up: SendQueueUpdate,
289) -> bool {
290    let Some(client) = client.get() else {
291        // Client shutting down.
292        debug!("Client is shutting down, exiting thread subscriber task");
293        return false;
294    };
295
296    let room_id = up.room_id;
297    let Some(room) = client.get_room(&room_id) else {
298        warn!(%room_id, "unknown room");
299        return true;
300    };
301
302    let (thread_root, subscribe_up_to) = match up.update {
303        RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
304            match local_echo.content {
305                LocalEchoContent::Event { serialized_event, .. } => {
306                    if let Some(thread_root) =
307                        extract_thread_root_from_content(serialized_event.into_raw().0)
308                    {
309                        events_being_sent.insert(local_echo.transaction_id, thread_root);
310                    }
311                }
312                LocalEchoContent::React { .. } => {
313                    // Nothing to do, reactions don't count as a thread
314                    // subscription.
315                }
316
317                LocalEchoContent::Redaction { .. } => {
318                    // Nothing to do, redactions don't count as a thread
319                    // subscription.
320                }
321            }
322            return true;
323        }
324
325        RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
326            events_being_sent.remove(&transaction_id);
327            return true;
328        }
329
330        RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
331            if let Some(thread_root) = extract_thread_root_from_content(new_content.into_raw().0) {
332                events_being_sent.insert(transaction_id, thread_root);
333            } else {
334                // It could be that the event isn't part of a thread anymore; handle that by
335                // removing the pending transaction id.
336                events_being_sent.remove(&transaction_id);
337            }
338            return true;
339        }
340
341        RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
342            if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
343                (thread_root, event_id)
344            } else {
345                // We don't know about the event that has been sent, so ignore it.
346                trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
347                return true;
348            }
349        }
350
351        RoomSendQueueUpdate::SendError { .. }
352        | RoomSendQueueUpdate::RetryEvent { .. }
353        | RoomSendQueueUpdate::MediaUpload { .. } => {
354            // Nothing to do for these bad boys.
355            return true;
356        }
357    };
358
359    // And if we've found such a mention, subscribe to the thread up to this event.
360    trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
361
362    if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await {
363        warn!(%err, "Failed to subscribe to thread");
364    } else {
365        let _ = thread_subscriber_sender.send(());
366    }
367
368    true
369}
370
371/// React to a given linked chunk update by subscribing the user to a
372/// thread, if needs be (when the user got mentioned in a thread reply, for
373/// a thread they were not subscribed to).
374///
375/// Returns a boolean indicating whether the task should keep on running or
376/// not.
377#[instrument(skip(client, thread_subscriber_sender))]
378async fn handle_thread_subscriber_linked_chunk_update(
379    client: &WeakClient,
380    thread_subscriber_sender: &Sender<()>,
381    up: RoomEventCacheLinkedChunkUpdate,
382) -> bool {
383    let Some(client) = client.get() else {
384        // Client shutting down.
385        debug!("Client is shutting down, exiting thread subscriber task");
386        return false;
387    };
388
389    let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
390        trace!("received an update for a non-thread linked chunk, ignoring");
391        return true;
392    };
393
394    let Some(room) = client.get_room(room_id) else {
395        warn!(%room_id, "unknown room");
396        return true;
397    };
398
399    let thread_root = thread_root.clone();
400
401    let mut new_events = up.events().peekable();
402
403    if new_events.peek().is_none() {
404        // No new events, nothing to do.
405        return true;
406    }
407
408    // This `PushContext` is going to be used to compute whether an in-thread event
409    // would trigger a mention.
410    //
411    // Of course, we're not interested in an in-thread event causing a mention,
412    // because it's part of a thread we've subscribed to. So the
413    // `PushContext` must not include the check for thread subscriptions (otherwise
414    // it would be impossible to subscribe to new threads).
415
416    let with_thread_subscriptions = false;
417
418    let Some(push_context) = room
419        .push_context_internal(with_thread_subscriptions)
420        .await
421        .inspect_err(|err| {
422            warn!("Failed to get push context for threads: {err}");
423        })
424        .ok()
425        .flatten()
426    else {
427        warn!("Missing push context for thread subscriptions.");
428        return true;
429    };
430
431    let mut subscribe_up_to = None;
432
433    // Find if there's an event that would trigger a mention for the current
434    // user, iterating from the end of the new events towards the oldest, so we can
435    // find the most recent event to subscribe to.
436    for ev in new_events.rev() {
437        if push_context.for_event(ev.raw()).await.into_iter().any(|action| action.should_notify()) {
438            let Some(event_id) = ev.event_id() else {
439                // Shouldn't happen.
440                continue;
441            };
442            subscribe_up_to = Some(event_id);
443            break;
444        }
445    }
446
447    // And if we've found such a mention, subscribe to the thread up to this
448    // event.
449    if let Some(event_id) = subscribe_up_to {
450        trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
451        if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
452            warn!(%err, "Failed to subscribe to thread");
453        } else {
454            let _ = thread_subscriber_sender.send(());
455        }
456    }
457
458    true
459}
460
461/// Takes an [`Event`] and passes it to the [`RoomIndex`] of the
462/// given room which will add/remove/edit an event in the index based on
463/// the event type.
464///
465/// [`Event`]: matrix_sdk_base::event_cache::Event
466/// [`RoomIndex`]: matrix_sdk_search::index::RoomIndex
467#[cfg(feature = "experimental-search")]
468#[instrument(skip_all)]
469pub(super) async fn search_indexing_task(
470    client: WeakClient,
471    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
472) {
473    let mut linked_chunk_update_receiver = linked_chunk_update_sender.subscribe();
474
475    loop {
476        match linked_chunk_update_receiver.recv().await {
477            Ok(room_ec_lc_update) => {
478                let OwnedLinkedChunkId::Room(room_id) = room_ec_lc_update.linked_chunk_id.clone()
479                else {
480                    trace!("Received non-room updates, ignoring.");
481                    continue;
482                };
483
484                let mut timeline_events = room_ec_lc_update.events().peekable();
485
486                if timeline_events.peek().is_none() {
487                    continue;
488                }
489
490                let Some(client) = client.get() else {
491                    trace!("Client is shutting down, exiting search task");
492                    return;
493                };
494
495                let maybe_room_cache = client.event_cache().for_room(&room_id).await;
496                let Ok((room_cache, _drop_handles)) = maybe_room_cache else {
497                    warn!(for_room = %room_id, "Failed to get RoomEventCache: {maybe_room_cache:?}");
498                    continue;
499                };
500
501                let maybe_room = client.get_room(&room_id);
502                let Some(room) = maybe_room else {
503                    warn!(get_room = %room_id, "Failed to get room while indexing: {maybe_room:?}");
504                    continue;
505                };
506                let redaction_rules = room.clone_info().room_version_rules_or_default().redaction;
507
508                let mut search_index_guard = client.search_index().lock().await;
509
510                if let Err(err) = search_index_guard
511                    .bulk_handle_timeline_event(
512                        timeline_events,
513                        &room_cache,
514                        &room_id,
515                        &redaction_rules,
516                    )
517                    .await
518                {
519                    error!("Failed to handle events for indexing: {err}")
520                }
521            }
522            Err(RecvError::Closed) => {
523                debug!(
524                    "Linked chunk update channel has been closed, exiting thread subscriber task"
525                );
526                break;
527            }
528            Err(RecvError::Lagged(num_skipped)) => {
529                warn!(num_skipped, "Lagged behind linked chunk updates");
530            }
531        }
532    }
533}