use std::collections::BTreeSet;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{Event, Gap, store::EventCacheStoreLock},
linked_chunk::{OwnedLinkedChunkId, Position, Update},
};
use matrix_sdk_common::executor::spawn;
use ruma::{OwnedEventId, OwnedRoomId};
use tokio::sync::broadcast::Sender;
use tracing::instrument;
use super::super::{
super::{EventsOrigin, Result, deduplicator::DeduplicationOutcome},
TimelineVectorDiffs,
event_linked_chunk::EventLinkedChunk,
lock,
room::RoomEventCacheLinkedChunkUpdate,
};
pub struct ThreadEventCacheState {
#[allow(dead_code)] room_id: OwnedRoomId,
thread_id: OwnedEventId,
store: EventCacheStoreLock,
thread_linked_chunk: EventLinkedChunk,
pub sender: Sender<TimelineVectorDiffs>,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
waited_for_initial_prev_token: bool,
}
impl lock::Store for ThreadEventCacheState {
fn store(&self) -> &EventCacheStoreLock {
&self.store
}
}
pub type LockedThreadEventCacheState = lock::StateLock<ThreadEventCacheState>;
impl LockedThreadEventCacheState {
pub fn new(
room_id: OwnedRoomId,
thread_id: OwnedEventId,
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Self {
Self::new_inner(ThreadEventCacheState {
room_id,
thread_id,
store,
thread_linked_chunk: EventLinkedChunk::new(),
sender: Sender::new(32),
linked_chunk_update_sender,
waited_for_initial_prev_token: false,
})
}
}
pub type ThreadEventCacheStateLockReadGuard<'a> =
lock::StateLockReadGuard<'a, ThreadEventCacheState>;
pub type ThreadEventCacheStateLockWriteGuard<'a> =
lock::StateLockWriteGuard<'a, ThreadEventCacheState>;
impl<'a> lock::Reload for ThreadEventCacheStateLockWriteGuard<'a> {
async fn reload(&mut self) -> Result<()> {
self.state.thread_linked_chunk.reset();
let diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ =
self.state.sender.send(TimelineVectorDiffs { diffs, origin: EventsOrigin::Cache });
}
Ok(())
}
}
impl<'a> ThreadEventCacheStateLockReadGuard<'a> {
pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
&self.state.thread_linked_chunk
}
pub fn waited_for_initial_prev_token(&self) -> bool {
self.state.waited_for_initial_prev_token
}
}
impl<'a> ThreadEventCacheStateLockWriteGuard<'a> {
pub fn thread_linked_chunk(&self) -> &EventLinkedChunk {
&self.state.thread_linked_chunk
}
pub fn thread_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk {
&mut self.state.thread_linked_chunk
}
pub fn waited_for_initial_prev_token_mut(&mut self) -> &mut bool {
&mut self.state.waited_for_initial_prev_token
}
pub async fn handle_sync(&mut self, events: Vec<Event>) -> Result<Vec<VectorDiff<Event>>> {
let deduplication = self.filter_duplicate_events(events);
if deduplication.non_empty_all_duplicates {
return Ok(Vec::new());
}
self.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
let events = deduplication.all_events;
self.state.thread_linked_chunk.push_live_events(None, &events);
self.propagate_changes().await?;
let timeline_event_diffs = self.state.thread_linked_chunk.updates_as_vector_diffs();
Ok(timeline_event_diffs)
}
pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
let store = self.store.clone();
let room_id = self.state.room_id.clone();
let events = events.into_iter().collect::<Vec<_>>();
spawn(async move {
for event in events {
store.save_event(&room_id, event).await?;
}
Result::Ok(())
})
.await
.expect("joining failed")?;
Ok(())
}
pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>> {
self.reset_internal().await?;
let diff_updates = self.state.thread_linked_chunk.updates_as_vector_diffs();
debug_assert_eq!(diff_updates.len(), 1);
debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
Ok(diff_updates)
}
async fn reset_internal(&mut self) -> Result<()> {
self.state.thread_linked_chunk.reset();
self.propagate_changes().await?;
self.state.waited_for_initial_prev_token = false;
Ok(())
}
#[instrument(skip_all)]
pub async fn remove_events(
&mut self,
in_memory_events: Vec<(OwnedEventId, Position)>,
) -> Result<()> {
if in_memory_events.is_empty() {
return Ok(());
}
self.state
.thread_linked_chunk
.remove_events_by_position(
in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
)
.expect("failed to remove an event");
self.propagate_changes().await
}
pub async fn propagate_changes(&mut self) -> Result<()> {
let updates = self.state.thread_linked_chunk.store_updates().take();
self.send_updates_to_store(updates).await
}
#[allow(clippy::unused_async)] async fn send_updates_to_store(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
let linked_chunk_id =
OwnedLinkedChunkId::Thread(self.state.room_id.clone(), self.state.thread_id.clone());
let _ = self
.state
.linked_chunk_update_sender
.send(RoomEventCacheLinkedChunkUpdate { linked_chunk_id, updates });
Ok(())
}
pub fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
let mut new_event_ids = BTreeSet::new();
new_events.retain(|event| {
event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
});
let in_memory_duplicated_event_ids: Vec<_> = self
.state
.thread_linked_chunk
.events()
.filter_map(|(position, event)| {
let event_id = event.event_id()?;
new_event_ids.contains(&event_id).then_some((event_id, position))
})
.collect();
let in_store_duplicated_event_ids = Vec::new();
let at_least_one_event = !new_events.is_empty();
let all_duplicates = (in_memory_duplicated_event_ids.len()
+ in_store_duplicated_event_ids.len())
== new_events.len();
let non_empty_all_duplicates = at_least_one_event && all_duplicates;
DeduplicationOutcome {
all_events: new_events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates,
}
}
}