use std::collections::BTreeSet;
use matrix_sdk::{
event_cache::{
EventFocusThreadMode, EventsOrigin, RoomEventCache, RoomEventCacheSubscriber,
RoomEventCacheUpdate, TimelineVectorDiffs,
},
send_queue::RoomSendQueueUpdate,
};
use ruma::OwnedEventId;
use tokio::sync::broadcast::{Receiver, error::RecvError};
use tracing::{error, instrument, trace, warn};
use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin};
#[instrument(
skip_all,
fields(
room_id = %timeline_controller.room().room_id(),
)
)]
pub(in crate::timeline) async fn pinned_events_task(
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut pinned_events_recv: Receiver<TimelineVectorDiffs>,
) {
loop {
trace!("Waiting for an event.");
let update = match pinned_events_recv.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind pinned-event cache updates, resetting timeline");
let (initial_events, _) = match room_event_cache.subscribe_to_pinned_events().await
{
Ok(initial_events) => initial_events,
Err(err) => {
error!(
?err,
"Failed to replace the initial remote events in the event cache"
);
break;
}
};
timeline_controller
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
.await;
continue;
}
};
trace!("Received new timeline events diffs");
let origin = match update.origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};
timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
}
}
#[instrument(
skip_all,
fields(
room_id = %timeline_controller.room().room_id(),
focused_event_id = %focused_event,
?thread_mode
)
)]
pub(in crate::timeline) async fn event_focused_task(
focused_event: OwnedEventId,
thread_mode: EventFocusThreadMode,
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut event_focused_events_recv: Receiver<TimelineVectorDiffs>,
) {
loop {
trace!("Waiting for an event.");
let update = match event_focused_events_recv.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind focused-event cache updates, resetting timeline");
let cache = match room_event_cache
.get_event_focused_cache(focused_event.clone(), thread_mode)
.await
{
Ok(Some(cache)) => cache,
Ok(None) => {
error!("Focused event timeline doesn't have an attached cache");
break;
}
Err(err) => {
error!(%err, "Failed to get the focused cache for the focused event");
break;
}
};
let (initial_events, _) = cache.subscribe().await;
timeline_controller
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
.await;
continue;
}
};
trace!("Received new timeline events diffs");
let origin = match update.origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};
timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
}
}
pub(in crate::timeline) async fn thread_updates_task(
mut receiver: Receiver<TimelineVectorDiffs>,
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
root: OwnedEventId,
) {
trace!("Spawned the thread event subscriber task.");
loop {
trace!("Waiting for an event.");
let update = match receiver.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
_ = timeline_controller.init_with_thread_root(&root, &room_event_cache).await;
continue;
}
};
trace!("Received new timeline events diffs");
let origin = match update.origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};
let has_diffs = !update.diffs.is_empty();
timeline_controller.handle_remote_events_with_diffs(update.diffs, origin).await;
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
timeline_controller.retry_event_decryption(None).await;
}
}
trace!("Thread event subscriber task finished.");
}
pub(in crate::timeline) async fn room_event_cache_updates_task(
room_event_cache: RoomEventCache,
timeline_controller: TimelineController,
mut room_event_cache_subscriber: RoomEventCacheSubscriber,
timeline_focus: TimelineFocus,
) {
trace!("Spawned the event subscriber task.");
loop {
trace!("Waiting for an event.");
let update = match room_event_cache_subscriber.recv().await {
Ok(up) => up,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind event cache updates, resetting timeline");
let initial_events = match room_event_cache.events().await {
Ok(initial_events) => initial_events,
Err(err) => {
error!(
?err,
"Failed to replace the initial remote events in the event cache"
);
break;
}
};
timeline_controller
.replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache)
.await;
continue;
}
};
match update {
RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => {
trace!(target = %event_id, "Handling fully read marker.");
timeline_controller.handle_fully_read_marker(event_id).await;
}
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, origin }) => {
trace!("Received new timeline events diffs");
let origin = match origin {
EventsOrigin::Sync => RemoteEventOrigin::Sync,
EventsOrigin::Pagination => RemoteEventOrigin::Pagination,
EventsOrigin::Cache => RemoteEventOrigin::Cache,
};
let has_diffs = !diffs.is_empty();
if matches!(timeline_focus, TimelineFocus::Live { .. }) {
timeline_controller.handle_remote_events_with_diffs(diffs, origin).await;
} else if !matches!(timeline_focus, TimelineFocus::PinnedEvents) {
timeline_controller.handle_remote_aggregations(diffs, origin).await;
}
if has_diffs && matches!(origin, RemoteEventOrigin::Cache) {
timeline_controller.retry_event_decryption(None).await;
}
}
RoomEventCacheUpdate::AddEphemeralEvents { events } => {
trace!("Received new ephemeral events from sync.");
timeline_controller.handle_ephemeral_events(events).await;
}
RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => {
if !ambiguity_changes.is_empty() {
let member_ambiguity_changes = ambiguity_changes
.values()
.flat_map(|change| change.user_ids())
.collect::<BTreeSet<_>>();
timeline_controller
.force_update_sender_profiles(&member_ambiguity_changes)
.await;
}
}
}
}
}
pub(in crate::timeline) async fn room_send_queue_update_task(
mut send_queue_stream: Receiver<RoomSendQueueUpdate>,
timeline_controller: TimelineController,
) {
trace!("spawned the local echo task!");
loop {
match send_queue_stream.recv().await {
Ok(update) => timeline_controller.handle_room_send_queue_update(update).await,
Err(RecvError::Lagged(num_missed)) => {
warn!("missed {num_missed} local echoes, ignoring those missed");
}
Err(RecvError::Closed) => {
trace!("channel closed, exiting the local echo handler");
break;
}
}
}
}