use std::sync::Arc;
use eyeball::SharedObservable;
use eyeball_im::VectorDiff;
use matrix_sdk_base::{
event_cache::{Event, Gap},
linked_chunk::ChunkContent,
};
use ruma::api::Direction;
use tracing::trace;
use super::{
super::super::{
EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
caches::pagination::{
BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
},
},
ThreadEventCacheInner,
};
use crate::{
event_cache::caches::pagination::SharedPaginationStatus,
room::{IncludeRelations, RelationsOptions},
};
#[derive(Clone)]
struct ThreadEventCacheWrapper {
cache: Arc<ThreadEventCacheInner>,
dummy_pagination_status: SharedObservable<SharedPaginationStatus>,
}
#[allow(missing_debug_implementations)]
pub struct ThreadPagination(Pagination<ThreadEventCacheWrapper>);
impl ThreadPagination {
pub(super) fn new(cache: Arc<ThreadEventCacheInner>) -> Self {
Self(Pagination::new(ThreadEventCacheWrapper {
cache,
dummy_pagination_status: SharedObservable::new(SharedPaginationStatus::Idle {
hit_timeline_start: false,
}),
}))
}
pub async fn run_backwards_until(
&self,
num_requested_events: u16,
) -> Result<BackPaginationOutcome> {
self.0.run_backwards_until(num_requested_events).await
}
pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
self.0.run_backwards_once(batch_size).await
}
}
impl PaginatedCache for ThreadEventCacheWrapper {
fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
&self.dummy_pagination_status
}
async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
let state = self.cache.state.read().await?;
if let Some(prev_token) = state.thread_linked_chunk().rgap().map(|gap| gap.token) {
trace!(%prev_token, "thread chunk has at least a gap");
return Ok(LoadMoreEventsBackwardsOutcome::Gap {
prev_token: Some(prev_token),
waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
});
}
if let Some((_pos, event)) = state.thread_linked_chunk().events().next() {
let first_event_id =
event.event_id().expect("a linked chunk only stores events with IDs");
if first_event_id == self.cache.thread_id {
trace!("thread chunk is fully loaded and non-empty: reached_start=true");
return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
}
}
Ok(LoadMoreEventsBackwardsOutcome::Gap {
prev_token: None,
waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
})
}
async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
*self.cache.state.write().await?.waited_for_initial_prev_token_mut() = true;
Ok(())
}
async fn wait_for_prev_token(&self) {
}
async fn paginate_backwards_with_network(
&self,
batch_size: u16,
prev_token: &Option<String>,
) -> Result<Option<(Vec<Event>, Option<String>)>> {
let Some(room) = self.cache.weak_room.get() else {
return Ok(None);
};
let options = RelationsOptions {
from: prev_token.clone(),
dir: Direction::Backward,
limit: Some(batch_size.into()),
include_relations: IncludeRelations::AllRelations,
recurse: true,
};
let response = room
.relations(self.cache.thread_id.clone(), options)
.await
.map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
Ok(Some((response.chunk, response.next_batch_token)))
}
async fn conclude_backwards_pagination_from_disk(
&self,
_events: Vec<Event>,
_timeline_event_diffs: Vec<VectorDiff<Event>>,
_reached_start: bool,
) -> BackPaginationOutcome {
unimplemented!("loading from disk for threads is not implemented yet");
}
async fn conclude_backwards_pagination_from_network(
&self,
mut events: Vec<Event>,
prev_token: Option<String>,
new_token: Option<String>,
) -> Result<Option<BackPaginationOutcome>> {
let Some(room) = self.cache.weak_room.get() else {
return Ok(None);
};
let reached_start = new_token.is_none();
let root_event = if reached_start {
Some(
room.load_or_fetch_event(&self.cache.thread_id, None)
.await
.map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?,
)
} else {
None
};
let mut state = self.cache.state.write().await?;
state.save_events(events.iter().cloned()).await?;
events.extend(root_event);
let prev_gap_id = if let Some(token) = prev_token {
let Some(gap_id) = state.thread_linked_chunk().chunk_identifier(|chunk| {
matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
}) else {
return Ok(None);
};
Some(gap_id)
} else {
None
};
let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
let new_gap = new_token.map(|token| Gap { token });
let deduplication = state.filter_duplicate_events(topo_ordered_events);
let (events, new_gap) = if deduplication.non_empty_all_duplicates {
(Vec::new(), None)
} else {
assert!(
deduplication.in_store_duplicated_event_ids.is_empty(),
"persistent storage for threads is not implemented yet"
);
state.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
(deduplication.all_events, new_gap)
};
let reached_start = state.thread_linked_chunk_mut().push_backwards_pagination_events(
prev_gap_id,
new_gap,
&events,
);
state.propagate_changes().await?;
let updates = state.thread_linked_chunk_mut().updates_as_vector_diffs();
if !updates.is_empty() {
let _ = state
.state
.sender
.send(TimelineVectorDiffs { diffs: updates, origin: EventsOrigin::Pagination });
}
Ok(Some(BackPaginationOutcome { reached_start, events }))
}
}