use std::collections::BTreeSet;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{Event, Gap},
linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
};
use ruma::{EventId, OwnedEventId, OwnedRoomId};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::{error, trace};
use crate::event_cache::{
BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
deduplicator::DeduplicationOutcome,
room::{LoadMoreEventsBackwardsOutcome, events::EventLinkedChunk},
};
#[derive(Clone, Debug)]
pub struct ThreadEventCacheUpdate {
pub diffs: Vec<VectorDiff<Event>>,
pub origin: EventsOrigin,
}
pub(crate) struct ThreadEventCache {
room_id: OwnedRoomId,
thread_root: OwnedEventId,
chunk: EventLinkedChunk,
sender: Sender<ThreadEventCacheUpdate>,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}
impl ThreadEventCache {
pub fn new(
room_id: OwnedRoomId,
thread_root: OwnedEventId,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Self {
Self {
chunk: EventLinkedChunk::new(),
sender: Sender::new(32),
room_id,
thread_root,
linked_chunk_update_sender,
}
}
pub fn subscribe(&self) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
let events = self.chunk.events().map(|(_position, item)| item.clone()).collect();
let recv = self.sender.subscribe();
(events, recv)
}
pub fn clear(&mut self) {
self.chunk.reset();
let diffs = self.chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Cache });
}
}
fn propagate_changes(&mut self) {
let updates = self.chunk.store_updates().take();
let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
updates,
linked_chunk_id: OwnedLinkedChunkId::Thread(
self.room_id.clone(),
self.thread_root.clone(),
),
});
}
pub fn add_live_events(&mut self, events: Vec<Event>) {
if events.is_empty() {
return;
}
let deduplication = self.filter_duplicate_events(events);
if deduplication.non_empty_all_duplicates {
return;
}
self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
let events = deduplication.all_events;
self.chunk.push_live_events(None, &events);
self.propagate_changes();
let diffs = self.chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
}
}
pub(crate) fn remove_if_present(&mut self, event_id: &EventId) {
let Some(pos) = self.chunk.events().find_map(|(pos, event)| {
(event.event_id().as_deref() == Some(event_id)).then_some(pos)
}) else {
return;
};
if let Err(err) = self.chunk.remove_events_by_position(vec![pos]) {
error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting");
return;
}
self.propagate_changes();
let diffs = self.chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
}
}
pub fn load_more_events_backwards(&self) -> LoadMoreEventsBackwardsOutcome {
if let Some(prev_token) = self.chunk.rgap().map(|gap| gap.prev_token) {
trace!(%prev_token, "thread chunk has at least a gap");
return LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) };
}
if let Some((_pos, event)) = self.chunk.events().next() {
let first_event_id =
event.event_id().expect("a linked chunk only stores events with IDs");
if first_event_id == self.thread_root {
trace!("thread chunk is fully loaded and non-empty: reached_start=true");
return LoadMoreEventsBackwardsOutcome::StartOfTimeline;
}
}
LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
}
fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
let mut new_event_ids = BTreeSet::new();
new_events.retain(|event| {
event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
});
let in_memory_duplicated_event_ids: Vec<_> = self
.chunk
.events()
.filter_map(|(position, event)| {
let event_id = event.event_id()?;
new_event_ids.contains(&event_id).then_some((event_id, position))
})
.collect();
let in_store_duplicated_event_ids = Vec::new();
let at_least_one_event = !new_events.is_empty();
let all_duplicates = (in_memory_duplicated_event_ids.len()
+ in_store_duplicated_event_ids.len())
== new_events.len();
let non_empty_all_duplicates = at_least_one_event && all_duplicates;
DeduplicationOutcome {
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates,
}
}
fn remove_in_memory_duplicated_events(
&mut self,
in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
) {
self.chunk
.remove_events_by_position(
in_memory_duplicated_event_ids
.iter()
.map(|(_event_id, position)| *position)
.collect(),
)
.expect("we collected the position of the events to remove just before");
}
pub fn finish_network_pagination(
&mut self,
prev_token: Option<String>,
new_token: Option<String>,
events: Vec<Event>,
) -> Option<BackPaginationOutcome> {
let prev_gap_id = if let Some(token) = prev_token {
let gap_id = self.chunk.chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
})?;
Some(gap_id)
} else {
None
};
let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
let new_gap = new_token.map(|token| Gap { prev_token: token });
let deduplication = self.filter_duplicate_events(topo_ordered_events);
let (events, new_gap) = if deduplication.non_empty_all_duplicates {
(Vec::new(), None)
} else {
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
(deduplication.all_events, new_gap)
};
let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
self.propagate_changes();
let updates = self.chunk.updates_as_vector_diffs();
if !updates.is_empty() {
let _ = self
.sender
.send(ThreadEventCacheUpdate { diffs: updates, origin: EventsOrigin::Pagination });
}
Some(BackPaginationOutcome { reached_start, events })
}
pub fn latest_event_id(&self) -> Option<OwnedEventId> {
self.chunk.revents().next().and_then(|(_position, event)| event.event_id())
}
}