use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use eyeball::{SharedObservable, Subscriber};
use eyeball_im::VectorDiff;
use futures_core::{Stream, ready};
use matrix_sdk_base::{
event_cache::{Event, Gap},
linked_chunk::{ChunkContent, LinkedChunkId, Update},
};
use pin_project_lite::pin_project;
use ruma::api::Direction;
use tracing::{error, trace};
pub use super::super::pagination::PaginationStatus;
use super::{
super::{
super::{
EventCacheError, EventsOrigin, Result, RoomEventCacheGenericUpdate,
deduplicator::{DeduplicationOutcome, filter_duplicate_events},
},
TimelineVectorDiffs,
pagination::{
BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
},
},
PostProcessingOrigin, RoomEventCacheInner, RoomEventCacheUpdate,
};
use crate::{event_cache::caches::pagination::SharedPaginationStatus, room::MessagesOptions};
pin_project! {
pub struct PaginationStatusSubscriber {
#[pin]
subscriber: Subscriber<SharedPaginationStatus>,
}
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for PaginationStatusSubscriber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PaginationStatusSubscriber").finish_non_exhaustive()
}
}
impl PaginationStatusSubscriber {
fn map(from: SharedPaginationStatus) -> PaginationStatus {
match from {
SharedPaginationStatus::Idle { hit_timeline_start } => {
PaginationStatus::Idle { hit_timeline_start }
}
SharedPaginationStatus::Paginating { .. } => PaginationStatus::Paginating,
}
}
pub fn get(&self) -> PaginationStatus {
Self::map(self.subscriber.get())
}
pub async fn next(&mut self) -> Option<PaginationStatus> {
self.subscriber.next().await.map(Self::map)
}
pub fn next_now(&mut self) -> PaginationStatus {
Self::map(self.subscriber.next_now())
}
}
impl Stream for PaginationStatusSubscriber {
type Item = PaginationStatus;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(ready!(self.project().subscriber.as_mut().poll_next(cx)).map(Self::map))
}
}
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct RoomPagination(Pagination<Arc<RoomEventCacheInner>>);
impl RoomPagination {
pub(super) fn new(cache: Arc<RoomEventCacheInner>) -> Self {
Self(Pagination::new(cache))
}
pub async fn run_backwards_until(
&self,
num_requested_events: u16,
) -> Result<BackPaginationOutcome> {
self.0.run_backwards_until(num_requested_events).await
}
pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
self.0.run_backwards_once(batch_size).await
}
pub fn status(&self) -> PaginationStatusSubscriber {
PaginationStatusSubscriber { subscriber: self.0.cache.status().subscribe() }
}
#[cfg(test)]
pub(super) async fn load_more_events_backwards(
&self,
) -> Result<LoadMoreEventsBackwardsOutcome> {
self.0.cache.load_more_events_backwards().await
}
}
impl PaginatedCache for Arc<RoomEventCacheInner> {
fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
&self.shared_pagination_status
}
async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
let mut state = self.state.write().await?;
if let Some(prev_token) = state.room_linked_chunk().rgap().map(|gap| gap.token) {
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
prev_token: Some(prev_token),
waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
});
}
let prev_first_chunk =
state.room_linked_chunk().chunks().next().expect("a linked chunk is never empty");
let linked_chunk_id = LinkedChunkId::Room(&state.state.room_id);
let new_first_chunk = match state
.store
.load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
.await
{
Ok(Some(new_first_chunk)) => {
new_first_chunk
}
Ok(None) => {
if state.room_linked_chunk().events().next().is_some() {
trace!("chunk is fully loaded and non-empty: reached_start=true");
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
prev_token: None,
waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
});
}
Err(err) => {
error!("error when loading the previous chunk of a linked chunk: {err}");
state
.store
.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
.await?;
return Err(err.into());
}
};
let chunk_content = new_first_chunk.content.clone();
let reached_start = new_first_chunk.previous.is_none();
if let Err(err) = state.room_linked_chunk_mut().insert_new_chunk_as_first(new_first_chunk) {
error!("error when inserting the previous chunk into its linked chunk: {err}");
state
.store
.handle_linked_chunk_updates(
LinkedChunkId::Room(&state.state.room_id),
vec![Update::Clear],
)
.await?;
return Err(err.into());
}
let _ = state.room_linked_chunk_mut().store_updates().take();
let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
Ok(match chunk_content {
ChunkContent::Gap(gap) => {
trace!("reloaded chunk from disk (gap)");
LoadMoreEventsBackwardsOutcome::Gap {
prev_token: Some(gap.token),
waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
}
}
ChunkContent::Items(events) => {
trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
LoadMoreEventsBackwardsOutcome::Events {
events,
timeline_event_diffs,
reached_start,
}
}
})
}
async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
*self.state.write().await?.waited_for_initial_prev_token_mut() = true;
Ok(())
}
async fn wait_for_prev_token(&self) {
self.pagination_batch_token_notifier.notified().await
}
async fn paginate_backwards_with_network(
&self,
batch_size: u16,
prev_token: &Option<String>,
) -> Result<Option<(Vec<Event>, Option<String>)>> {
let Some(room) = self.weak_room.get() else {
return Ok(None);
};
let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
options.limit = batch_size.into();
let response = room
.messages(options)
.await
.map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
Ok(Some((response.chunk, response.end)))
}
async fn conclude_backwards_pagination_from_disk(
&self,
events: Vec<Event>,
timeline_event_diffs: Vec<VectorDiff<Event>>,
reached_start: bool,
) -> BackPaginationOutcome {
if !timeline_event_diffs.is_empty() {
self.update_sender.send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Cache,
}),
Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
);
}
BackPaginationOutcome {
reached_start,
events: events.into_iter().rev().collect(),
}
}
async fn conclude_backwards_pagination_from_network(
&self,
events: Vec<Event>,
prev_token: Option<String>,
mut new_token: Option<String>,
) -> Result<Option<BackPaginationOutcome>> {
let mut state = self.state.write().await?;
let prev_gap_id = if let Some(token) = prev_token {
let gap_chunk_id = state.room_linked_chunk().chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
});
if gap_chunk_id.is_none() {
return Ok(None);
}
gap_chunk_id
} else {
None
};
let DeduplicationOutcome {
all_events: mut events,
in_memory_duplicated_event_ids,
in_store_duplicated_event_ids,
non_empty_all_duplicates: all_duplicates,
} = {
let room_linked_chunk = state.room_linked_chunk();
filter_duplicate_events(
&state.state.own_user_id,
&state.store,
LinkedChunkId::Room(&state.state.room_id),
room_linked_chunk,
events,
)
.await?
};
if !all_duplicates {
state
.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
.await?;
} else {
events.clear();
new_token = None;
}
let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
let new_gap = new_token.map(|prev_token| Gap { token: prev_token });
let reached_start = state.room_linked_chunk_mut().push_backwards_pagination_events(
prev_gap_id,
new_gap,
&topo_ordered_events,
);
let receipt_event = None;
state
.post_process_new_events(
topo_ordered_events,
PostProcessingOrigin::Backpagination,
receipt_event,
)
.await?;
let timeline_event_diffs = state.room_linked_chunk_mut().updates_as_vector_diffs();
if !timeline_event_diffs.is_empty() {
self.update_sender.send(
RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
diffs: timeline_event_diffs,
origin: EventsOrigin::Pagination,
}),
Some(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() }),
);
}
Ok(Some(BackPaginationOutcome { events, reached_start }))
}
}