matrix_sdk/event_cache/pagination.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::{
21 deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
22};
23use matrix_sdk_common::linked_chunk::ChunkContent;
24use ruma::api::Direction;
25use tokio::sync::RwLockWriteGuard;
26use tracing::{debug, instrument, trace};
27
28use super::{
29 deduplicator::DeduplicationOutcome,
30 room::{events::Gap, LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
31 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
32};
33use crate::{event_cache::EventCacheError, room::MessagesOptions};
34
35/// Status for the back-pagination on a room event cache.
36#[derive(Debug, PartialEq, Clone, Copy)]
37#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
38pub enum RoomPaginationStatus {
39 /// No back-pagination is happening right now.
40 Idle {
41 /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
42 /// have any effect?
43 hit_timeline_start: bool,
44 },
45
46 /// Back-pagination is already running in the background.
47 Paginating,
48}
49
50/// Small RAII guard to reset the pagination status on drop, if not disarmed in
51/// the meanwhile.
52struct ResetStatusOnDrop {
53 prev_status: Option<RoomPaginationStatus>,
54 pagination_status: SharedObservable<RoomPaginationStatus>,
55}
56
57impl ResetStatusOnDrop {
58 /// Make the RAII guard have no effect.
59 fn disarm(mut self) {
60 self.prev_status = None;
61 }
62}
63
64impl Drop for ResetStatusOnDrop {
65 fn drop(&mut self) {
66 if let Some(status) = self.prev_status.take() {
67 let _ = self.pagination_status.set(status);
68 }
69 }
70}
71
72/// An API object to run pagination queries on a [`super::RoomEventCache`].
73///
74/// Can be created with [`super::RoomEventCache::pagination()`].
75#[allow(missing_debug_implementations)]
76#[derive(Clone)]
77pub struct RoomPagination {
78 pub(super) inner: Arc<RoomEventCacheInner>,
79}
80
81impl RoomPagination {
82 /// Starts a back-pagination for the requested number of events.
83 ///
84 /// This automatically takes care of waiting for a pagination token from
85 /// sync, if we haven't done that before.
86 ///
87 /// It will run multiple back-paginations until one of these two conditions
88 /// is met:
89 /// - either we've reached the start of the timeline,
90 /// - or we've obtained enough events to fulfill the requested number of
91 /// events.
92 #[instrument(skip(self))]
93 pub async fn run_backwards_until(
94 &self,
95 num_requested_events: u16,
96 ) -> Result<BackPaginationOutcome> {
97 let mut events = Vec::new();
98
99 loop {
100 if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
101 events.extend(outcome.events);
102 if outcome.reached_start || events.len() >= num_requested_events as usize {
103 return Ok(BackPaginationOutcome {
104 reached_start: outcome.reached_start,
105 events,
106 });
107 }
108 trace!(
109 "restarting back-pagination, because we haven't reached \
110 the start or obtained enough events yet"
111 );
112 }
113
114 debug!("restarting back-pagination because of a timeline reset.");
115 }
116 }
117
118 /// Run a single back-pagination for the requested number of events.
119 ///
120 /// This automatically takes care of waiting for a pagination token from
121 /// sync, if we haven't done that before.
122 #[instrument(skip(self))]
123 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
124 loop {
125 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
126 return Ok(outcome);
127 }
128 debug!("restarting back-pagination because of a timeline reset.");
129 }
130 }
131
132 /// Paginate from either the storage or the network, and let pagination
133 /// status observers know about updates.
134 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
135 // There is at least one gap that must be resolved; reach the network.
136 // First, ensure there's no other ongoing back-pagination.
137 let status_observable = &self.inner.pagination_status;
138
139 let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
140 if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
141 return Err(EventCacheError::AlreadyBackpaginating);
142 }
143
144 let reset_status_on_drop_guard = ResetStatusOnDrop {
145 prev_status: Some(prev_status),
146 pagination_status: status_observable.clone(),
147 };
148
149 match self.paginate_backwards_impl(batch_size).await? {
150 Some(outcome) => {
151 // Back-pagination's over and successful, don't reset the status to the previous
152 // value.
153 reset_status_on_drop_guard.disarm();
154
155 // Notify subscribers that pagination ended.
156 status_observable
157 .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
158
159 Ok(Some(outcome))
160 }
161
162 None => {
163 // We keep the previous status value, because we haven't obtained more
164 // information about the pagination.
165 Ok(None)
166 }
167 }
168 }
169
170 /// Paginate from either the storage or the network.
171 ///
172 /// This method isn't concerned with setting the pagination status; only the
173 /// caller is.
174 async fn paginate_backwards_impl(
175 &self,
176 batch_size: u16,
177 ) -> Result<Option<BackPaginationOutcome>> {
178 // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
179 // to load from storage first, then from network if storage indicated
180 // there's no previous events chunk to load.
181
182 loop {
183 let mut state_guard = self.inner.state.write().await;
184
185 match state_guard.load_more_events_backwards().await? {
186 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
187 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
188
189 // Release the state guard while waiting, to not deadlock the sync task.
190 drop(state_guard);
191
192 // Otherwise, wait for a notification that we received a previous-batch token.
193 trace!("waiting for a pagination token…");
194 let _ = timeout(
195 self.inner.pagination_batch_token_notifier.notified(),
196 DEFAULT_WAIT_FOR_TOKEN_DURATION,
197 )
198 .await;
199 trace!("done waiting");
200
201 self.inner.state.write().await.waited_for_initial_prev_token = true;
202
203 // Retry!
204 //
205 // Note: the next call to `load_more_events_backwards` can't return
206 // `WaitForInitialPrevToken` because we've just set to
207 // `waited_for_initial_prev_token`, so this is not an infinite loop.
208 //
209 // Note 2: not a recursive call, because recursive and async have a bad time
210 // together.
211 continue;
212 }
213
214 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
215 // We have a gap, so resolve it with a network back-pagination.
216 drop(state_guard);
217 return self.paginate_backwards_with_network(batch_size, prev_token).await;
218 }
219
220 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
221 return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
222 }
223
224 LoadMoreEventsBackwardsOutcome::Events {
225 events,
226 timeline_event_diffs,
227 reached_start,
228 } => {
229 if !timeline_event_diffs.is_empty() {
230 let _ =
231 self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
232 diffs: timeline_event_diffs,
233 origin: EventsOrigin::Cache,
234 });
235 }
236
237 return Ok(Some(BackPaginationOutcome {
238 reached_start,
239 // This is a backwards pagination. `BackPaginationOutcome` expects events to
240 // be in “reverse order”.
241 events: events.into_iter().rev().collect(),
242 }));
243 }
244 }
245 }
246 }
247
248 /// Run a single pagination request (/messages) to the server.
249 ///
250 /// If there are no previous-batch tokens, it will wait for one for a short
251 /// while to get one, or if it's already done so or if it's seen a
252 /// previous-batch token before, it will immediately indicate it's
253 /// reached the end of the timeline.
254 async fn paginate_backwards_with_network(
255 &self,
256 batch_size: u16,
257 prev_token: Option<String>,
258 ) -> Result<Option<BackPaginationOutcome>> {
259 let (events, new_gap) = {
260 let Some(room) = self.inner.weak_room.get() else {
261 // The client is shutting down, return an empty default response.
262 return Ok(Some(BackPaginationOutcome {
263 reached_start: false,
264 events: Default::default(),
265 }));
266 };
267
268 let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
269 options.limit = batch_size.into();
270
271 let response = room.messages(options).await.map_err(|err| {
272 EventCacheError::BackpaginationError(
273 crate::event_cache::paginator::PaginatorError::SdkError(Box::new(err)),
274 )
275 })?;
276
277 let new_gap = response.end.map(|prev_token| Gap { prev_token });
278
279 (response.chunk, new_gap)
280 };
281
282 // Make sure the `RoomEvents` isn't updated while we are saving events from
283 // backpagination.
284 let state = self.inner.state.write().await;
285
286 // Check that the previous token still exists; otherwise it's a sign that the
287 // room's timeline has been cleared.
288 let prev_gap_chunk_id = if let Some(token) = prev_token {
289 let gap_chunk_id = state.events().chunk_identifier(|chunk| {
290 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
291 });
292
293 if gap_chunk_id.is_none() {
294 // We got a previous-batch token from the linked chunk *before* running the
295 // request, but it is missing *after* completing the
296 // request.
297 //
298 // It may be a sign the linked chunk has been reset, but it's fine, per this
299 // function's contract.
300 return Ok(None);
301 }
302
303 gap_chunk_id
304 } else {
305 None
306 };
307
308 self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
309 .await
310 .map(Some)
311 }
312
313 /// Handle the result of a successful network back-pagination.
314 async fn handle_network_pagination_result(
315 &self,
316 mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
317 events: Vec<TimelineEvent>,
318 new_gap: Option<Gap>,
319 prev_gap_id: Option<ChunkIdentifier>,
320 ) -> Result<BackPaginationOutcome> {
321 // If there's no new previous gap, then we've reached the start of the timeline.
322 let network_reached_start = new_gap.is_none();
323
324 let (
325 DeduplicationOutcome {
326 all_events: mut events,
327 in_memory_duplicated_event_ids,
328 in_store_duplicated_event_ids,
329 },
330 all_duplicates,
331 ) = state.collect_valid_and_duplicated_events(events).await?;
332
333 // If not all the events have been back-paginated, we need to remove the
334 // previous ones, otherwise we can end up with misordered events.
335 //
336 // Consider the following scenario:
337 // - sync returns [D, E, F]
338 // - then sync returns [] with a previous batch token PB1, so the internal
339 // linked chunk state is [D, E, F, PB1].
340 // - back-paginating with PB1 may return [A, B, C, D, E, F].
341 //
342 // Only inserting the new events when replacing PB1 would result in a timeline
343 // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
344 // all the events, in case this happens (see also #4746).
345
346 let mut event_diffs = if !all_duplicates {
347 // Let's forget all the previous events.
348 state
349 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
350 .await?
351 } else {
352 // All new events are duplicated, they can all be ignored.
353 events.clear();
354 Default::default()
355 };
356
357 let next_diffs = state
358 .with_events_mut(false, |room_events| {
359 // Reverse the order of the events as `/messages` has been called with `dir=b`
360 // (backwards). The `RoomEvents` API expects the first event to be the oldest.
361 // Let's re-order them for this block.
362 let reversed_events = events.iter().rev().cloned().collect::<Vec<_>>();
363
364 let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
365
366 // First, insert events.
367 let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
368 // There is a prior gap, let's replace it by new events!
369 if all_duplicates {
370 assert!(reversed_events.is_empty());
371 }
372
373 trace!("replacing previous gap with the back-paginated events");
374
375 // Replace the gap with the events we just deduplicated. This might get rid of
376 // the underlying gap, if the conditions are favorable to
377 // us.
378 room_events
379 .replace_gap_at(reversed_events.clone(), gap_id)
380 .expect("gap_identifier is a valid chunk id we read previously")
381 } else if let Some(pos) = first_event_pos {
382 // No prior gap, but we had some events: assume we need to prepend events
383 // before those.
384 trace!("inserted events before the first known event");
385
386 room_events
387 .insert_events_at(reversed_events.clone(), pos)
388 .expect("pos is a valid position we just read above");
389
390 Some(pos)
391 } else {
392 // No prior gap, and no prior events: push the events.
393 trace!("pushing events received from back-pagination");
394
395 room_events.push_events(reversed_events.clone());
396
397 // A new gap may be inserted before the new events, if there are any.
398 room_events.events().next().map(|(item_pos, _)| item_pos)
399 };
400
401 // And insert the new gap if needs be.
402 //
403 // We only do this when at least one new, non-duplicated event, has been added
404 // to the chunk. Otherwise it means we've back-paginated all the known events.
405 if !all_duplicates {
406 if let Some(new_gap) = new_gap {
407 if let Some(new_pos) = insert_new_gap_pos {
408 room_events
409 .insert_gap_at(new_gap, new_pos)
410 .expect("events_chunk_pos represents a valid chunk position");
411 } else {
412 room_events.push_gap(new_gap);
413 }
414 }
415 } else {
416 debug!(
417 "not storing previous batch token, because we \
418 deduplicated all new back-paginated events"
419 );
420 }
421
422 reversed_events
423 })
424 .await?;
425
426 event_diffs.extend(next_diffs);
427
428 // There could be an inconsistency between the network (which thinks we hit the
429 // start of the timeline) and the disk (which has the initial empty
430 // chunks), so tweak the `reached_start` value so that it reflects the disk
431 // state in priority instead.
432 let reached_start = {
433 // There are no gaps.
434 let has_gaps = state.events().chunks().any(|chunk| chunk.is_gap());
435
436 // The first chunk has no predecessors.
437 let first_chunk_is_definitive_head =
438 state.events().chunks().next().map(|chunk| chunk.is_definitive_head());
439
440 let reached_start =
441 !has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
442
443 trace!(
444 ?network_reached_start,
445 ?has_gaps,
446 ?first_chunk_is_definitive_head,
447 ?reached_start,
448 "finished handling network back-pagination"
449 );
450
451 reached_start
452 };
453
454 let backpagination_outcome = BackPaginationOutcome { events, reached_start };
455
456 if !event_diffs.is_empty() {
457 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
458 diffs: event_diffs,
459 origin: EventsOrigin::Pagination,
460 });
461 }
462
463 Ok(backpagination_outcome)
464 }
465
466 /// Returns a subscriber to the pagination status used for the
467 /// back-pagination integrated to the event cache.
468 pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
469 self.inner.pagination_status.subscribe()
470 }
471}
472
473/// Pagination token data, indicating in which state is the current pagination.
474#[derive(Clone, Debug, PartialEq)]
475pub enum PaginationToken {
476 /// We never had a pagination token, so we'll start back-paginating from the
477 /// end, or forward-paginating from the start.
478 None,
479 /// We paginated once before, and we received a prev/next batch token that
480 /// we may reuse for the next query.
481 HasMore(String),
482 /// We've hit one end of the timeline (either the start or the actual end),
483 /// so there's no need to continue paginating.
484 HitEnd,
485}
486
487impl From<Option<String>> for PaginationToken {
488 fn from(token: Option<String>) -> Self {
489 match token {
490 Some(val) => Self::HasMore(val),
491 None => Self::None,
492 }
493 }
494}