use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
ThreadingSupport,
event_cache::{Event, store::EventCacheStoreLock},
linked_chunk::Position,
sync::{JoinedRoomUpdate, LeftRoomUpdate},
};
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::{broadcast::Sender, mpsc};
use super::{EventCacheError, EventsOrigin, Result};
use crate::{
client::WeakClient, event_cache::automatic_pagination::AutomaticPagination, room::WeakRoom,
};
pub mod event_focused;
pub mod event_linked_chunk;
pub(super) mod lock;
pub mod pagination;
pub mod pinned_events;
mod read_receipts;
pub mod room;
pub mod thread;
#[derive(Debug)]
pub(super) struct Caches {
pub room: room::RoomEventCache,
}
impl Caches {
pub async fn new(
weak_client: &WeakClient,
room_id: &RoomId,
generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
store: EventCacheStoreLock,
automatic_pagination: Option<AutomaticPagination>,
) -> Result<Self> {
let Some(client) = weak_client.get() else {
return Err(EventCacheError::ClientDropped);
};
let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
let room = client
.get_room(room_id)
.ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
let room_version_rules = room.clone_info().room_version_rules_or_default();
let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
hit_timeline_start: false,
});
let enabled_thread_support =
matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
let own_user_id =
client.user_id().expect("the user must be logged in, at this point").to_owned();
let room_state = room::LockedRoomEventCacheState::new(
own_user_id,
room_id.to_owned(),
weak_room.clone(),
room_version_rules,
enabled_thread_support,
update_sender.clone(),
linked_chunk_update_sender,
store,
pagination_status.clone(),
automatic_pagination,
)
.await?;
let timeline_is_not_empty =
room_state.read().await?.room_linked_chunk().revents().next().is_some();
let room_event_cache = room::RoomEventCache::new(
room_id.to_owned(),
weak_room,
room_state,
pagination_status,
auto_shrink_sender,
update_sender,
);
if timeline_is_not_empty {
let _ = generic_update_sender
.send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
}
Ok(Self { room: room_event_cache })
}
pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
let Self { room } = &self;
room.handle_joined_room_update(updates).await?;
Ok(())
}
pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
let Self { room } = &self;
room.handle_left_room_update(updates).await?;
Ok(())
}
pub async fn prepare_to_reset(&mut self) -> Result<ResetCaches<'_>> {
ResetCaches::new(self).await
}
#[cfg(feature = "e2e-encryption")]
pub async fn all_events(&self) -> Result<impl Iterator<Item = Event>> {
let events_from_room = self.room.events().await?;
Ok(events_from_room.into_iter())
}
}
pub(super) struct ResetCaches<'c> {
room_lock: (&'c room::RoomEventCache, room::RoomEventCacheStateLockWriteGuard<'c>),
}
impl<'c> ResetCaches<'c> {
async fn new(Caches { room }: &'c mut Caches) -> Result<Self> {
Ok(Self { room_lock: (room, room.state().write().await?) })
}
pub async fn reset_all(self) -> Result<()> {
let Self { room_lock: (room, mut room_state) } = self;
{
let updates_as_vector_diffs = room_state.reset().await?;
room.update_sender().send(
room::RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs: updates_as_vector_diffs,
origin: EventsOrigin::Cache,
}),
Some(room::RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
);
}
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct TimelineVectorDiffs {
pub diffs: Vec<VectorDiff<Event>>,
pub origin: EventsOrigin,
}
pub(super) enum EventLocation {
Memory(Position),
Store,
}