pub mod pagination;
mod state;
use std::{fmt, sync::Arc};
use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock};
use ruma::{EventId, OwnedEventId, OwnedRoomId};
pub(super) use state::LockedThreadEventCacheState;
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::error;
use self::pagination::ThreadPagination;
use super::{
super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate,
};
use crate::room::WeakRoom;
pub(super) struct ThreadEventCache {
inner: Arc<ThreadEventCacheInner>,
}
struct ThreadEventCacheInner {
thread_id: OwnedEventId,
weak_room: WeakRoom,
state: LockedThreadEventCacheState,
}
impl fmt::Debug for ThreadEventCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThreadEventCache").finish_non_exhaustive()
}
}
impl ThreadEventCache {
pub fn new(
room_id: OwnedRoomId,
thread_id: OwnedEventId,
weak_room: WeakRoom,
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
) -> Self {
Self {
inner: Arc::new(ThreadEventCacheInner {
thread_id: thread_id.clone(),
weak_room,
state: LockedThreadEventCacheState::new(
room_id,
thread_id,
store,
linked_chunk_update_sender,
),
}),
}
}
pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
let state = self.inner.state.read().await?;
let events =
state.thread_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
let recv = state.state.sender.subscribe();
Ok((events, recv))
}
pub fn pagination(&self) -> ThreadPagination {
ThreadPagination::new(self.inner.clone())
}
pub async fn clear(&mut self) -> Result<()> {
let mut state = self.inner.state.write().await?;
let updates_as_vector_diffs = state.reset().await?;
if !updates_as_vector_diffs.is_empty() {
let _ = state.state.sender.send(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
});
}
Ok(())
}
pub async fn add_live_events(&mut self, events: Vec<Event>) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let mut state = self.inner.state.write().await?;
let timeline_event_diffs = state.handle_sync(events).await?;
if !timeline_event_diffs.is_empty() {
let _ = state.state.sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Sync,
});
}
Ok(())
}
pub(super) async fn remove_if_present(&mut self, event_id: &EventId) -> Result<()> {
let mut state = self.inner.state.write().await?;
let Some(position) = state.thread_linked_chunk().events().find_map(|(position, event)| {
(event.event_id().as_deref() == Some(event_id)).then_some(position)
}) else {
return Ok(());
};
if let Err(err) = state.remove_events(vec![(event_id.to_owned(), position)]).await {
error!(%err, "a thread linked chunk position was valid a few lines above, but invalid when deleting");
return Err(err);
}
let timeline_event_diffs = state.thread_linked_chunk_mut().updates_as_vector_diffs();
if !timeline_event_diffs.is_empty() {
let _ = state.state.sender.send(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Sync,
});
}
Ok(())
}
pub async fn latest_event_id(&self) -> Result<Option<OwnedEventId>> {
Ok(self
.inner
.state
.read()
.await?
.thread_linked_chunk()
.revents()
.next()
.and_then(|(_position, event)| event.event_id()))
}
}