matrix-sdk-ui 0.17.0

GUI-centric utilities on top of matrix-rust-sdk (experimental).
Documentation
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Long-lived tasks for the timeline.

use std::collections::BTreeSet;

use matrix_sdk::{
    event_cache::{
        EventFocusThreadMode, EventsOrigin, RoomEventCache, RoomEventCacheSubscriber,
        RoomEventCacheUpdate, TimelineVectorDiffs,
    },
    send_queue::RoomSendQueueUpdate,
};
use ruma::OwnedEventId;
use tokio::sync::broadcast::{Receiver, error::RecvError};
use tracing::{error, instrument, trace, warn};

use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};

/// Long-lived task, in the pinned events focus mode, that updates the timeline
/// after any changes in the pinned events.
#[instrument(
    skip_all,
    fields(
        room_id = %timeline_controller.room().room_id(),
    )
)]
pub(in crate::timeline) async fn pinned_events_task(
    room_event_cache: RoomEventCache,
    timeline_controller: TimelineController,
    mut pinned_events_recv: Receiver<TimelineVectorDiffs>,
) {
    loop {
        trace!("Waiting for an event.");

        let update = match pinned_events_recv.recv().await {
            Ok(up) => up,
            Err(RecvError::Closed) => break,
            Err(RecvError::Lagged(num_skipped)) => {
                warn!(num_skipped, "Lagged behind pinned-event cache updates, resetting timeline");

                // The updates might have lagged, but the room event cache might have
                // events, so retrieve them and add them back again to the timeline,
                // after clearing it.
                let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await
                {
                    Ok(initial_events) => initial_events,
                    Err(err) => {
                        error!(
                            ?err,
                            "Failed to replace the initial remote events in the event cache"
                        );
                        break;
                    }
                };

                timeline_controller
                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
                    .await;

                continue;
            }
        };

        trace!("Received new timeline events diffs");
        let origin = match update.origin {
            EventsOrigin::Sync => RemoteEventOrigin::Sync,
            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
            EventsOrigin::Cache => RemoteEventOrigin::Cache,
        };
        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
    }
}

/// Long-lived task, in the event focus mode, that updates the timeline after
/// any changes to the underlying timeline.
#[instrument(
    skip_all,
    fields(
        room_id = %timeline_controller.room().room_id(),
        focused_event_id = %focused_event,
        ?thread_mode
    )
)]
pub(in crate::timeline) async fn event_focused_task(
    focused_event: OwnedEventId,
    thread_mode: EventFocusThreadMode,
    room_event_cache: RoomEventCache,
    timeline_controller: TimelineController,
    mut event_focused_events_recv: Receiver<TimelineVectorDiffs>,
) {
    loop {
        trace!("Waiting for an event.");

        let update = match event_focused_events_recv.recv().await {
            Ok(up) => up,
            Err(RecvError::Closed) => break,
            Err(RecvError::Lagged(num_skipped)) => {
                warn!(num_skipped, "Lagged behind focused-event cache updates, resetting timeline");

                // The updates might have lagged, but the room event cache might have
                // events, so retrieve them and add them back again to the timeline,
                // after clearing it.
                let cache = match room_event_cache
                    .get_event_focused_cache(focused_event.clone(), thread_mode)
                    .await
                {
                    Ok(Some(cache)) => cache,
                    Ok(None) => {
                        error!("Focused event timeline doesn't have an attached cache");
                        break;
                    }
                    Err(err) => {
                        error!(%err, "Failed to get the focused cache for the focused event");
                        break;
                    }
                };

                let (initial_events, _) = cache.subscribe().await;

                timeline_controller
                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
                    .await;

                continue;
            }
        };

        trace!("Received new timeline events diffs");
        let origin = match update.origin {
            EventsOrigin::Sync => RemoteEventOrigin::Sync,
            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
            EventsOrigin::Cache => RemoteEventOrigin::Cache,
        };
        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
    }
}

/// For a thread-focused timeline, a long-lived task that will listen to the
/// underlying thread updates.
pub(in crate::timeline) async fn thread_updates_task(
    mut receiver: Receiver<TimelineVectorDiffs>,
    room_event_cache: RoomEventCache,
    timeline_controller: TimelineController,
    root: OwnedEventId,
) {
    trace!("Spawned the thread event subscriber task.");

    loop {
        trace!("Waiting for an event.");

        let update = match receiver.recv().await {
            Ok(up) => up,
            Err(RecvError::Closed) => break,
            Err(RecvError::Lagged(num_skipped)) => {
                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");

                // The updates might have lagged, but the room event cache might
                // have events, so retrieve them and add them back again to the
                // timeline, after clearing it.
                _ = timeline_controller.init_with_thread_root(&root, &room_event_cache).await;

                continue;
            }
        };

        trace!("Received new timeline events diffs");

        let origin = match update.origin {
            EventsOrigin::Sync => RemoteEventOrigin::Sync,
            EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
            EventsOrigin::Cache => RemoteEventOrigin::Cache,
        };

        let has_diffs = !update.diffs.is_empty();

        timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;

        if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
            timeline_controller.retry_event_decryption(None).await;
        }
    }

    trace!("Thread event subscriber task finished.");
}

/// Long-lived task that forwards the [`RoomEventCacheUpdate`]s (remote echoes)
/// to the timeline.
pub(in crate::timeline) async fn room_event_cache_updates_task(
    room_event_cache: RoomEventCache,
    timeline_controller: TimelineController,
    mut room_event_cache_subscriber: RoomEventCacheSubscriber,
    timeline_focus: TimelineFocus,
) {
    trace!("Spawned the event subscriber task.");

    loop {
        trace!("Waiting for an event.");

        let update = match room_event_cache_subscriber.recv().await {
            Ok(up) => up,
            Err(RecvError::Closed) => break,
            Err(RecvError::Lagged(num_skipped)) => {
                warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");

                // The updates might have lagged, but the room event cache might have
                // events, so retrieve them and add them back again to the timeline,
                // after clearing it.
                let initial_events = match room_event_cache.events().await {
                    Ok(initial_events) => initial_events,
                    Err(err) => {
                        error!(
                            ?err,
                            "Failed to replace the initial remote events in the event cache"
                        );
                        break;
                    }
                };

                timeline_controller
                    .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
                    .await;

                continue;
            }
        };

        match update {
            RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
                trace!(target = %event_id, "Handling fully read marker.");
                timeline_controller.handle_fully_read_marker(event_id).await;
            }

            RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, origin }) => {
                trace!("Received new timeline events diffs");
                let origin = match origin {
                    EventsOrigin::Sync => RemoteEventOrigin::Sync,
                    EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
                    EventsOrigin::Cache => RemoteEventOrigin::Cache,
                };

                let has_diffs = !diffs.is_empty();

                if matches!(timeline_focus, TimelineFocus::Live { .. }) {
                    timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
                } else if !matches!(timeline_focus, TimelineFocus::PinnedEvents) {
                    // Only handle the remote aggregation for a non-live timeline, that's not the
                    // pinned events one (since the latter handles remote aggregations on its own).
                    timeline_controller.handle_remote_aggregations(diffs, origin).await;
                }

                if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
                    timeline_controller.retry_event_decryption(None).await;
                }
            }

            RoomEventCacheUpdate::AddEphemeralEvents { events } => {
                trace!("Received new ephemeral events from sync.");

                // TODO: ephemeral (read receipts) should be handled by the event cache (#4113).
                timeline_controller.handle_ephemeral_events(events).await;
            }

            RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
                if !ambiguity_changes.is_empty() {
                    let member_ambiguity_changes = ambiguity_changes
                        .values()
                        .flat_map(|change| change.user_ids())
                        .collect::<BTreeSet<_>>();
                    timeline_controller
                        .force_update_sender_profiles(&member_ambiguity_changes)
                        .await;
                }
            }
        }
    }
}

/// Long-lived task that forwards [`RoomSendQueueUpdate`]s (local echoes) to the
/// timeline.
pub(in crate::timeline) async fn room_send_queue_update_task(
    mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
    timeline_controller: TimelineController,
) {
    trace!("spawned the local echo task!");

    loop {
        match send_queue_stream.recv().await {
            Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,

            Err(RecvError::Lagged(num_missed)) => {
                warn!("missed {num_missed} local echoes, ignoring those missed");
            }

            Err(RecvError::Closed) => {
                trace!("channel closed, exiting the local echo handler");
                break;
            }
        }
    }
}