use std::{pin::Pin, sync::Arc, time::Duration};
use eyeball::{ObservableWriteGuard, SharedObservable};
use eyeball_im::VectorDiff;
use futures_util::{
FutureExt as _,
future::{Either, Shared, ready},
};
use matrix_sdk_base::{
SendOutsideWasm, SyncOutsideWasm, event_cache::Event, executor::AbortOnDrop, timeout::timeout,
};
use matrix_sdk_common::executor::spawn;
use tracing::{debug, instrument, trace, warn};
use super::super::Result;
#[derive(Clone, Debug)]
pub(in super::super) struct Pagination<C: SendOutsideWasm + 'static> {
pub cache: C,
}
impl<C: SendOutsideWasm + 'static> Pagination<C> {
pub fn new(cache: C) -> Self {
Self { cache }
}
}
impl<C> Pagination<C>
where
C: Clone + PaginatedCache + SendOutsideWasm + 'static + SyncOutsideWasm,
{
#[instrument(skip(self))]
pub async fn run_backwards_until(
&self,
num_requested_events: u16,
) -> Result<BackPaginationOutcome> {
let mut events = Vec::new();
loop {
if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
events.extend(outcome.events);
if outcome.reached_start || events.len() >= num_requested_events as usize {
return Ok(BackPaginationOutcome {
reached_start: outcome.reached_start,
events,
});
}
trace!(
"restarting back-pagination, because we haven't reached \
the start or obtained enough events yet"
);
}
debug!("restarting back-pagination because of a timeline reset.");
}
}
#[instrument(skip(self))]
pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
loop {
if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
return Ok(outcome);
}
debug!("restarting back-pagination because of a timeline reset");
}
}
fn run_backwards_impl(
&self,
batch_size: u16,
) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> {
let status_observable = self.cache.status();
let mut status_guard = status_observable.write();
match &*status_guard {
SharedPaginationStatus::Idle { hit_timeline_start } => {
if *hit_timeline_start {
ObservableWriteGuard::set(
&mut status_guard,
SharedPaginationStatus::Idle { hit_timeline_start: true },
);
return Either::Left(ready(Ok(Some(BackPaginationOutcome {
reached_start: true,
events: Vec::new(),
}))));
}
}
SharedPaginationStatus::Paginating { shared_task: shared } => {
let shared = shared.clone();
drop(status_guard);
return Either::Right(shared.fut.clone());
}
}
let reset_status_on_drop_guard = ResetStatusOnDrop {
prev_status: Some(status_guard.clone()),
pagination_status: status_observable.clone(),
};
let this = self.clone();
let fut: Pin<Box<dyn SharedPaginationFuture>> = Box::pin(async move {
match this.paginate_backwards_impl(batch_size).await? {
Some(outcome) => {
reset_status_on_drop_guard.disarm();
this.cache.status().set(SharedPaginationStatus::Idle {
hit_timeline_start: outcome.reached_start,
});
Ok(Some(outcome))
}
None => Ok(None),
}
});
let shared_task = fut.shared();
let shared_task_clone = shared_task.clone();
let join_handle = spawn(async move {
if let Err(err) = shared_task_clone.await {
warn!("event cache back-pagination failed: {err}");
}
});
ObservableWriteGuard::set(
&mut status_guard,
SharedPaginationStatus::Paginating {
shared_task: SharedPaginationTask {
fut: shared_task.clone(),
_join_handle: Arc::new(AbortOnDrop::new(join_handle)),
},
},
);
drop(status_guard);
Either::Right(shared_task)
}
async fn paginate_backwards_impl(
&self,
batch_size: u16,
) -> Result<Option<BackPaginationOutcome>> {
loop {
match self.cache.load_more_events_backwards().await? {
LoadMoreEventsBackwardsOutcome::Gap {
prev_token,
waited_for_initial_prev_token,
} => {
if prev_token.is_none() && !waited_for_initial_prev_token {
const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
trace!("waiting for a pagination token…");
let _ = timeout(
self.cache.wait_for_prev_token(),
DEFAULT_WAIT_FOR_TOKEN_DURATION,
)
.await;
trace!("done waiting");
self.cache.mark_has_waited_for_initial_prev_token().await?;
continue;
}
return self.paginate_backwards_with_network(batch_size, prev_token).await;
}
LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
}
LoadMoreEventsBackwardsOutcome::Events {
events,
timeline_event_diffs,
reached_start,
} => {
return Ok(Some(
self.cache
.conclude_backwards_pagination_from_disk(
events,
timeline_event_diffs,
reached_start,
)
.await,
));
}
}
}
}
async fn paginate_backwards_with_network(
&self,
batch_size: u16,
prev_token: Option<String>,
) -> Result<Option<BackPaginationOutcome>> {
let Some((events, new_token)) =
self.cache.paginate_backwards_with_network(batch_size, &prev_token).await?
else {
return Ok(Some(BackPaginationOutcome {
reached_start: false,
events: Default::default(),
}));
};
self.cache.conclude_backwards_pagination_from_network(events, prev_token, new_token).await
}
}
trait SharedPaginationFuture:
Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm
{
}
impl<T: Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm>
SharedPaginationFuture for T
{
}
#[derive(Clone)]
pub(in super::super) struct SharedPaginationTask {
fut: Shared<Pin<Box<dyn SharedPaginationFuture>>>,
_join_handle: Arc<AbortOnDrop<()>>,
}
#[derive(Clone)]
pub(in super::super) enum SharedPaginationStatus {
Idle {
hit_timeline_start: bool,
},
Paginating { shared_task: SharedPaginationTask },
}
pub(in super::super) trait PaginatedCache {
fn status(&self) -> &SharedObservable<SharedPaginationStatus>;
fn load_more_events_backwards(
&self,
) -> impl Future<Output = Result<LoadMoreEventsBackwardsOutcome>> + SendOutsideWasm;
fn mark_has_waited_for_initial_prev_token(
&self,
) -> impl Future<Output = Result<()>> + SendOutsideWasm;
fn wait_for_prev_token(&self) -> impl Future<Output = ()> + SendOutsideWasm;
fn paginate_backwards_with_network(
&self,
batch_size: u16,
prev_token: &Option<String>,
) -> impl Future<Output = Result<Option<(Vec<Event>, Option<String>)>>> + SendOutsideWasm;
fn conclude_backwards_pagination_from_disk(
&self,
events: Vec<Event>,
timeline_event_diffs: Vec<VectorDiff<Event>>,
reached_start: bool,
) -> impl Future<Output = BackPaginationOutcome> + SendOutsideWasm;
fn conclude_backwards_pagination_from_network(
&self,
events: Vec<Event>,
prev_token: Option<String>,
new_token: Option<String>,
) -> impl Future<Output = Result<Option<BackPaginationOutcome>>> + SendOutsideWasm;
}
#[derive(Debug, PartialEq, Clone, Copy)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum PaginationStatus {
Idle {
hit_timeline_start: bool,
},
Paginating,
}
struct ResetStatusOnDrop {
prev_status: Option<SharedPaginationStatus>,
pagination_status: SharedObservable<SharedPaginationStatus>,
}
impl ResetStatusOnDrop {
fn disarm(mut self) {
self.prev_status = None;
}
}
impl Drop for ResetStatusOnDrop {
fn drop(&mut self) {
if let Some(status) = self.prev_status.take() {
let _ = self.pagination_status.set(status);
}
}
}
#[derive(Clone, Debug)]
pub struct BackPaginationOutcome {
pub reached_start: bool,
pub events: Vec<Event>,
}
#[derive(Debug)]
pub(in super::super) enum LoadMoreEventsBackwardsOutcome {
Gap {
prev_token: Option<String>,
waited_for_initial_prev_token: bool,
},
StartOfTimeline,
Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
}