use std::{
sync::{Arc, atomic::Ordering},
time::Duration,
};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_base::timeout::timeout;
use ruma::api::Direction;
use tracing::{debug, instrument, trace};
use super::{
BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
room::{LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
};
use crate::{
event_cache::{EventCacheError, RoomEventCacheGenericUpdate},
room::MessagesOptions,
};
#[derive(Debug, PartialEq, Clone, Copy)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum RoomPaginationStatus {
Idle {
hit_timeline_start: bool,
},
Paginating,
}
struct ResetStatusOnDrop {
prev_status: Option<RoomPaginationStatus>,
pagination_status: SharedObservable<RoomPaginationStatus>,
}
impl ResetStatusOnDrop {
fn disarm(mut self) {
self.prev_status = None;
}
}
impl Drop for ResetStatusOnDrop {
fn drop(&mut self) {
if let Some(status) = self.prev_status.take() {
let _ = self.pagination_status.set(status);
}
}
}
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct RoomPagination {
pub(super) inner: Arc<RoomEventCacheInner>,
}
impl RoomPagination {
#[instrument(skip(self))]
pub async fn run_backwards_until(
&self,
num_requested_events: u16,
) -> Result<BackPaginationOutcome> {
let mut events = Vec::new();
loop {
if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
events.extend(outcome.events);
if outcome.reached_start || events.len() >= num_requested_events as usize {
return Ok(BackPaginationOutcome {
reached_start: outcome.reached_start,
events,
});
}
trace!(
"restarting back-pagination, because we haven't reached \
the start or obtained enough events yet"
);
}
debug!("restarting back-pagination because of a timeline reset.");
}
}
#[instrument(skip(self))]
pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
loop {
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
return Ok(outcome);
}
debug!("restarting back-pagination because of a timeline reset.");
}
}
async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
let status_observable = &self.inner.pagination_status;
let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
return Err(EventCacheError::AlreadyBackpaginating);
}
let reset_status_on_drop_guard = ResetStatusOnDrop {
prev_status: Some(prev_status),
pagination_status: status_observable.clone(),
};
match self.paginate_backwards_impl(batch_size).await? {
Some(outcome) => {
reset_status_on_drop_guard.disarm();
status_observable
.set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
Ok(Some(outcome))
}
None => Ok(None),
}
}
async fn paginate_backwards_impl(
&self,
batch_size: u16,
) -> Result<Option<BackPaginationOutcome>> {
loop {
let mut state_guard = self.inner.state.write().await?;
match state_guard.load_more_events_backwards().await? {
LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
if prev_token.is_none()
&& !state_guard.waited_for_initial_prev_token().load(Ordering::SeqCst)
{
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
drop(state_guard);
trace!("waiting for a pagination token…");
let _ = timeout(
self.inner.pagination_batch_token_notifier.notified(),
DEFAULT_WAIT_FOR_TOKEN_DURATION,
)
.await;
trace!("done waiting");
self.inner
.state
.write()
.await?
.waited_for_initial_prev_token()
.store(true, Ordering::SeqCst);
continue;
}
drop(state_guard);
return self.paginate_backwards_with_network(batch_size, prev_token).await;
}
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
}
LoadMoreEventsBackwardsOutcome::Events {
events,
timeline_event_diffs,
reached_start,
} => {
if !timeline_event_diffs.is_empty() {
let _ = self.inner.update_sender.send(
RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: timeline_event_diffs,
origin: EventsOrigin::Cache,
},
);
let _ =
self.inner.generic_update_sender.send(RoomEventCacheGenericUpdate {
room_id: self.inner.room_id.clone(),
});
}
return Ok(Some(BackPaginationOutcome {
reached_start,
events: events.into_iter().rev().collect(),
}));
}
}
}
}
async fn paginate_backwards_with_network(
&self,
batch_size: u16,
prev_token: Option<String>,
) -> Result<Option<BackPaginationOutcome>> {
let (events, new_token) = {
let Some(room) = self.inner.weak_room.get() else {
return Ok(Some(BackPaginationOutcome {
reached_start: false,
events: Default::default(),
}));
};
let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
options.limit = batch_size.into();
let response = room
.messages(options)
.await
.map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
(response.chunk, response.end)
};
if let Some((outcome, timeline_event_diffs)) = self
.inner
.state
.write()
.await?
.handle_backpagination(events, new_token, prev_token)
.await?
{
if !timeline_event_diffs.is_empty() {
let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
diffs: timeline_event_diffs,
origin: EventsOrigin::Pagination,
});
}
Ok(Some(outcome))
} else {
Ok(None)
}
}
pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
self.inner.pagination_status.subscribe()
}
}