use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use matrix_sdk_base::task_monitor::{BackgroundTaskHandle, TaskMonitor};
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::mpsc;
use tracing::{info, instrument, trace, warn};
use crate::event_cache::EventCacheInner;
#[derive(Clone)]
pub struct AutomaticPagination {
inner: Arc<AutomaticPaginationInner>,
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for AutomaticPagination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AutomaticPagination").finish_non_exhaustive()
}
}
impl AutomaticPagination {
pub(super) fn new(event_cache: Weak<EventCacheInner>, task_monitor: &TaskMonitor) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
let task = task_monitor.spawn_infinite_task(
"event_cache::automatic_paginations_task",
automatic_paginations_task(event_cache, receiver),
);
Self { inner: Arc::new(AutomaticPaginationInner { _task: task, sender }) }
}
pub fn run_once(&self, room_id: &RoomId) -> bool {
self.inner
.sender
.send(AutomaticPaginationRequest::PaginateRoomBackwards { room_id: room_id.to_owned() })
.is_ok()
}
}
struct AutomaticPaginationInner {
_task: BackgroundTaskHandle,
sender: mpsc::UnboundedSender<AutomaticPaginationRequest>,
}
#[derive(Clone, Debug)]
enum AutomaticPaginationRequest {
PaginateRoomBackwards { room_id: OwnedRoomId },
}
#[instrument(skip_all)]
async fn automatic_paginations_task(
inner: Weak<EventCacheInner>,
mut receiver: mpsc::UnboundedReceiver<AutomaticPaginationRequest>,
) {
trace!("Spawning the automatic pagination task");
let mut room_pagination_credits = HashMap::new();
while let Some(request) = receiver.recv().await {
match request {
AutomaticPaginationRequest::PaginateRoomBackwards { room_id } => {
let Some(inner) = inner.upgrade() else {
break;
};
let config = *inner.config.read().unwrap();
let credits = room_pagination_credits
.entry(room_id.clone())
.or_insert(config.room_pagination_per_room_credit);
if *credits == 0 {
trace!(for_room = %room_id, "No more credits to paginate this room in the background, skipping");
continue;
}
let pagination = match inner.all_caches_for_room(&room_id).await {
Ok(caches) => caches.room.pagination(),
Err(err) => {
warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
continue;
}
};
trace!(for_room = %room_id, "automatic backpagination triggered");
match pagination.run_backwards_once(config.room_pagination_batch_size).await {
Ok(outcome) => {
if !outcome.reached_start || !outcome.events.is_empty() {
*credits -= 1;
}
}
Err(err) => {
warn!(for_room = %room_id, "Failed to run background pagination: {err}");
}
}
}
}
}
info!("Closing the automatic pagination task because receiver closed");
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use std::time::Duration;
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
use matrix_sdk_base::sleep::sleep;
use matrix_sdk_test::{BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
use ruma::{event_id, room_id};
use crate::{
assert_let_timeout,
event_cache::{EventsOrigin, RoomEventCacheUpdate},
test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
};
#[async_test]
async fn test_background_room_paginations() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.config_mut().experimental_auto_backpagination = true;
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_events.is_empty());
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("prev_batch"),
)
.await;
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
);
assert_eq!(update.diffs.len(), 1);
assert_matches!(update.diffs[0], VectorDiff::Clear);
assert_matches!(update.origin, EventsOrigin::Sync);
}
server
.mock_room_messages()
.ok(RoomMessagesResponseTemplate::default().events(vec![
f.text_msg("comté").event_id(event_id!("$2")),
f.text_msg("beaufort").event_id(event_id!("$1")),
]))
.mock_once()
.mount()
.await;
let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
assert!(automatic_pagination_api.run_once(room_id));
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
);
assert_eq!(update.diffs.len(), 1);
assert_matches!(update.origin, EventsOrigin::Pagination);
let mut room_events = room_events.into();
for diff in update.diffs {
diff.apply(&mut room_events);
}
assert_eq!(room_events.len(), 2);
assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
assert!(room_cache_updates.is_empty());
}
#[async_test]
async fn test_room_pagination_respects_credits_system() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;
let event_cache = client.event_cache();
event_cache.config_mut().experimental_auto_backpagination = true;
event_cache.config_mut().room_pagination_per_room_credit = 1;
event_cache.subscribe().unwrap();
let room_id = room_id!("!omelette:fromage.fr");
let f = EventFactory::new().room(room_id).sender(*BOB);
let room = server.sync_joined_room(&client, room_id).await;
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
assert!(room_events.is_empty());
assert!(room_cache_updates.is_empty());
server
.sync_room(
&client,
JoinedRoomBuilder::new(room_id)
.set_timeline_limited()
.set_timeline_prev_batch("prev_batch"),
)
.await;
{
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
);
assert_eq!(update.diffs.len(), 1);
assert_matches!(update.diffs[0], VectorDiff::Clear);
assert_matches!(update.origin, EventsOrigin::Sync);
}
server
.mock_room_messages()
.match_from("prev_batch")
.ok(RoomMessagesResponseTemplate::default()
.events(vec![
f.text_msg("comté").event_id(event_id!("$2")),
f.text_msg("beaufort").event_id(event_id!("$1")),
])
.end_token("prev_batch_2"))
.mock_once()
.mount()
.await;
let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
assert!(automatic_pagination_api.run_once(room_id));
assert_let_timeout!(
Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
);
assert_eq!(update.diffs.len(), 1);
assert_matches!(update.origin, EventsOrigin::Pagination);
let mut room_events = room_events.into();
for diff in update.diffs {
diff.apply(&mut room_events);
}
assert_eq!(room_events.len(), 2);
assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
assert!(room_cache_updates.is_empty());
assert!(automatic_pagination_api.run_once(room_id));
sleep(Duration::from_millis(300)).await;
assert!(room_cache_updates.is_empty());
server
.mock_room_messages()
.match_from("prev_batch_2")
.ok(RoomMessagesResponseTemplate::default())
.mock_once()
.mount()
.await;
let outcome = room_event_cache.pagination().run_backwards_once(30).await.unwrap();
assert!(outcome.reached_start);
}
}