matrix_sdk/event_cache/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{sync::Arc, time::Duration};
18
19use eyeball::{SharedObservable, Subscriber};
20use matrix_sdk_base::{
21    deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier, timeout::timeout,
22};
23use matrix_sdk_common::linked_chunk::ChunkContent;
24use ruma::api::Direction;
25use tokio::sync::RwLockWriteGuard;
26use tracing::{debug, instrument, trace};
27
28use super::{
29    deduplicator::DeduplicationOutcome,
30    room::{events::Gap, LoadMoreEventsBackwardsOutcome, RoomEventCacheInner},
31    BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheState, RoomEventCacheUpdate,
32};
33use crate::{event_cache::EventCacheError, room::MessagesOptions};
34
35/// Status for the back-pagination on a room event cache.
36#[derive(Debug, PartialEq, Clone, Copy)]
37#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
38pub enum RoomPaginationStatus {
39    /// No back-pagination is happening right now.
40    Idle {
41        /// Have we hit the start of the timeline, i.e. back-paginating wouldn't
42        /// have any effect?
43        hit_timeline_start: bool,
44    },
45
46    /// Back-pagination is already running in the background.
47    Paginating,
48}
49
50/// Small RAII guard to reset the pagination status on drop, if not disarmed in
51/// the meanwhile.
52struct ResetStatusOnDrop {
53    prev_status: Option<RoomPaginationStatus>,
54    pagination_status: SharedObservable<RoomPaginationStatus>,
55}
56
57impl ResetStatusOnDrop {
58    /// Make the RAII guard have no effect.
59    fn disarm(mut self) {
60        self.prev_status = None;
61    }
62}
63
64impl Drop for ResetStatusOnDrop {
65    fn drop(&mut self) {
66        if let Some(status) = self.prev_status.take() {
67            let _ = self.pagination_status.set(status);
68        }
69    }
70}
71
72/// An API object to run pagination queries on a [`super::RoomEventCache`].
73///
74/// Can be created with [`super::RoomEventCache::pagination()`].
75#[allow(missing_debug_implementations)]
76#[derive(Clone)]
77pub struct RoomPagination {
78    pub(super) inner: Arc<RoomEventCacheInner>,
79}
80
81impl RoomPagination {
82    /// Starts a back-pagination for the requested number of events.
83    ///
84    /// This automatically takes care of waiting for a pagination token from
85    /// sync, if we haven't done that before.
86    ///
87    /// It will run multiple back-paginations until one of these two conditions
88    /// is met:
89    /// - either we've reached the start of the timeline,
90    /// - or we've obtained enough events to fulfill the requested number of
91    ///   events.
92    #[instrument(skip(self))]
93    pub async fn run_backwards_until(
94        &self,
95        num_requested_events: u16,
96    ) -> Result<BackPaginationOutcome> {
97        let mut events = Vec::new();
98
99        loop {
100            if let Some(outcome) = self.run_backwards_impl(num_requested_events).await? {
101                events.extend(outcome.events);
102                if outcome.reached_start || events.len() >= num_requested_events as usize {
103                    return Ok(BackPaginationOutcome {
104                        reached_start: outcome.reached_start,
105                        events,
106                    });
107                }
108                trace!(
109                    "restarting back-pagination, because we haven't reached \
110                     the start or obtained enough events yet"
111                );
112            }
113
114            debug!("restarting back-pagination because of a timeline reset.");
115        }
116    }
117
118    /// Run a single back-pagination for the requested number of events.
119    ///
120    /// This automatically takes care of waiting for a pagination token from
121    /// sync, if we haven't done that before.
122    #[instrument(skip(self))]
123    pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
124        loop {
125            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
126                return Ok(outcome);
127            }
128            debug!("restarting back-pagination because of a timeline reset.");
129        }
130    }
131
132    /// Paginate from either the storage or the network, and let pagination
133    /// status observers know about updates.
134    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
135        // There is at least one gap that must be resolved; reach the network.
136        // First, ensure there's no other ongoing back-pagination.
137        let status_observable = &self.inner.pagination_status;
138
139        let prev_status = status_observable.set(RoomPaginationStatus::Paginating);
140        if !matches!(prev_status, RoomPaginationStatus::Idle { .. }) {
141            return Err(EventCacheError::AlreadyBackpaginating);
142        }
143
144        let reset_status_on_drop_guard = ResetStatusOnDrop {
145            prev_status: Some(prev_status),
146            pagination_status: status_observable.clone(),
147        };
148
149        match self.paginate_backwards_impl(batch_size).await? {
150            Some(outcome) => {
151                // Back-pagination's over and successful, don't reset the status to the previous
152                // value.
153                reset_status_on_drop_guard.disarm();
154
155                // Notify subscribers that pagination ended.
156                status_observable
157                    .set(RoomPaginationStatus::Idle { hit_timeline_start: outcome.reached_start });
158
159                Ok(Some(outcome))
160            }
161
162            None => {
163                // We keep the previous status value, because we haven't obtained more
164                // information about the pagination.
165                Ok(None)
166            }
167        }
168    }
169
170    /// Paginate from either the storage or the network.
171    ///
172    /// This method isn't concerned with setting the pagination status; only the
173    /// caller is.
174    async fn paginate_backwards_impl(
175        &self,
176        batch_size: u16,
177    ) -> Result<Option<BackPaginationOutcome>> {
178        // A linked chunk might not be entirely loaded (if it's been lazy-loaded). Try
179        // to load from storage first, then from network if storage indicated
180        // there's no previous events chunk to load.
181
182        loop {
183            let mut state_guard = self.inner.state.write().await;
184
185            match state_guard.load_more_events_backwards().await? {
186                LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken => {
187                    const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
188
189                    // Release the state guard while waiting, to not deadlock the sync task.
190                    drop(state_guard);
191
192                    // Otherwise, wait for a notification that we received a previous-batch token.
193                    trace!("waiting for a pagination token…");
194                    let _ = timeout(
195                        self.inner.pagination_batch_token_notifier.notified(),
196                        DEFAULT_WAIT_FOR_TOKEN_DURATION,
197                    )
198                    .await;
199                    trace!("done waiting");
200
201                    self.inner.state.write().await.waited_for_initial_prev_token = true;
202
203                    // Retry!
204                    //
205                    // Note: the next call to `load_more_events_backwards` can't return
206                    // `WaitForInitialPrevToken` because we've just set to
207                    // `waited_for_initial_prev_token`, so this is not an infinite loop.
208                    //
209                    // Note 2: not a recursive call, because recursive and async have a bad time
210                    // together.
211                    continue;
212                }
213
214                LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
215                    // We have a gap, so resolve it with a network back-pagination.
216                    drop(state_guard);
217                    return self.paginate_backwards_with_network(batch_size, prev_token).await;
218                }
219
220                LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
221                    return Ok(Some(BackPaginationOutcome { reached_start: true, events: vec![] }));
222                }
223
224                LoadMoreEventsBackwardsOutcome::Events {
225                    events,
226                    timeline_event_diffs,
227                    reached_start,
228                } => {
229                    if !timeline_event_diffs.is_empty() {
230                        let _ =
231                            self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
232                                diffs: timeline_event_diffs,
233                                origin: EventsOrigin::Cache,
234                            });
235                    }
236
237                    return Ok(Some(BackPaginationOutcome {
238                        reached_start,
239                        // This is a backwards pagination. `BackPaginationOutcome` expects events to
240                        // be in “reverse order”.
241                        events: events.into_iter().rev().collect(),
242                    }));
243                }
244            }
245        }
246    }
247
248    /// Run a single pagination request (/messages) to the server.
249    ///
250    /// If there are no previous-batch tokens, it will wait for one for a short
251    /// while to get one, or if it's already done so or if it's seen a
252    /// previous-batch token before, it will immediately indicate it's
253    /// reached the end of the timeline.
254    async fn paginate_backwards_with_network(
255        &self,
256        batch_size: u16,
257        prev_token: Option<String>,
258    ) -> Result<Option<BackPaginationOutcome>> {
259        let (events, new_gap) = {
260            let Some(room) = self.inner.weak_room.get() else {
261                // The client is shutting down, return an empty default response.
262                return Ok(Some(BackPaginationOutcome {
263                    reached_start: false,
264                    events: Default::default(),
265                }));
266            };
267
268            let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref());
269            options.limit = batch_size.into();
270
271            let response = room.messages(options).await.map_err(|err| {
272                EventCacheError::BackpaginationError(
273                    crate::event_cache::paginator::PaginatorError::SdkError(Box::new(err)),
274                )
275            })?;
276
277            let new_gap = response.end.map(|prev_token| Gap { prev_token });
278
279            (response.chunk, new_gap)
280        };
281
282        // Make sure the `RoomEvents` isn't updated while we are saving events from
283        // backpagination.
284        let state = self.inner.state.write().await;
285
286        // Check that the previous token still exists; otherwise it's a sign that the
287        // room's timeline has been cleared.
288        let prev_gap_chunk_id = if let Some(token) = prev_token {
289            let gap_chunk_id = state.events().chunk_identifier(|chunk| {
290                matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
291            });
292
293            if gap_chunk_id.is_none() {
294                // We got a previous-batch token from the linked chunk *before* running the
295                // request, but it is missing *after* completing the
296                // request.
297                //
298                // It may be a sign the linked chunk has been reset, but it's fine, per this
299                // function's contract.
300                return Ok(None);
301            }
302
303            gap_chunk_id
304        } else {
305            None
306        };
307
308        self.handle_network_pagination_result(state, events, new_gap, prev_gap_chunk_id)
309            .await
310            .map(Some)
311    }
312
313    /// Handle the result of a successful network back-pagination.
314    async fn handle_network_pagination_result(
315        &self,
316        mut state: RwLockWriteGuard<'_, RoomEventCacheState>,
317        events: Vec<TimelineEvent>,
318        new_gap: Option<Gap>,
319        prev_gap_id: Option<ChunkIdentifier>,
320    ) -> Result<BackPaginationOutcome> {
321        // If there's no new previous gap, then we've reached the start of the timeline.
322        let network_reached_start = new_gap.is_none();
323
324        let (
325            DeduplicationOutcome {
326                all_events: mut events,
327                in_memory_duplicated_event_ids,
328                in_store_duplicated_event_ids,
329            },
330            all_duplicates,
331        ) = state.collect_valid_and_duplicated_events(events).await?;
332
333        // If not all the events have been back-paginated, we need to remove the
334        // previous ones, otherwise we can end up with misordered events.
335        //
336        // Consider the following scenario:
337        // - sync returns [D, E, F]
338        // - then sync returns [] with a previous batch token PB1, so the internal
339        //   linked chunk state is [D, E, F, PB1].
340        // - back-paginating with PB1 may return [A, B, C, D, E, F].
341        //
342        // Only inserting the new events when replacing PB1 would result in a timeline
343        // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove
344        // all the events, in case this happens (see also #4746).
345
346        let mut event_diffs = if !all_duplicates {
347            // Let's forget all the previous events.
348            state
349                .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
350                .await?
351        } else {
352            // All new events are duplicated, they can all be ignored.
353            events.clear();
354            Default::default()
355        };
356
357        let next_diffs = state
358            .with_events_mut(false, |room_events| {
359                // Reverse the order of the events as `/messages` has been called with `dir=b`
360                // (backwards). The `RoomEvents` API expects the first event to be the oldest.
361                // Let's re-order them for this block.
362                let reversed_events = events.iter().rev().cloned().collect::<Vec<_>>();
363
364                let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
365
366                // First, insert events.
367                let insert_new_gap_pos = if let Some(gap_id) = prev_gap_id {
368                    // There is a prior gap, let's replace it by new events!
369                    if all_duplicates {
370                        assert!(reversed_events.is_empty());
371                    }
372
373                    trace!("replacing previous gap with the back-paginated events");
374
375                    // Replace the gap with the events we just deduplicated. This might get rid of
376                    // the underlying gap, if the conditions are favorable to
377                    // us.
378                    room_events
379                        .replace_gap_at(reversed_events.clone(), gap_id)
380                        .expect("gap_identifier is a valid chunk id we read previously")
381                } else if let Some(pos) = first_event_pos {
382                    // No prior gap, but we had some events: assume we need to prepend events
383                    // before those.
384                    trace!("inserted events before the first known event");
385
386                    room_events
387                        .insert_events_at(reversed_events.clone(), pos)
388                        .expect("pos is a valid position we just read above");
389
390                    Some(pos)
391                } else {
392                    // No prior gap, and no prior events: push the events.
393                    trace!("pushing events received from back-pagination");
394
395                    room_events.push_events(reversed_events.clone());
396
397                    // A new gap may be inserted before the new events, if there are any.
398                    room_events.events().next().map(|(item_pos, _)| item_pos)
399                };
400
401                // And insert the new gap if needs be.
402                //
403                // We only do this when at least one new, non-duplicated event, has been added
404                // to the chunk. Otherwise it means we've back-paginated all the known events.
405                if !all_duplicates {
406                    if let Some(new_gap) = new_gap {
407                        if let Some(new_pos) = insert_new_gap_pos {
408                            room_events
409                                .insert_gap_at(new_gap, new_pos)
410                                .expect("events_chunk_pos represents a valid chunk position");
411                        } else {
412                            room_events.push_gap(new_gap);
413                        }
414                    }
415                } else {
416                    debug!(
417                        "not storing previous batch token, because we \
418                         deduplicated all new back-paginated events"
419                    );
420                }
421
422                reversed_events
423            })
424            .await?;
425
426        event_diffs.extend(next_diffs);
427
428        // There could be an inconsistency between the network (which thinks we hit the
429        // start of the timeline) and the disk (which has the initial empty
430        // chunks), so tweak the `reached_start` value so that it reflects the disk
431        // state in priority instead.
432        let reached_start = {
433            // There are no gaps.
434            let has_gaps = state.events().chunks().any(|chunk| chunk.is_gap());
435
436            // The first chunk has no predecessors.
437            let first_chunk_is_definitive_head =
438                state.events().chunks().next().map(|chunk| chunk.is_definitive_head());
439
440            let reached_start =
441                !has_gaps && first_chunk_is_definitive_head.unwrap_or(network_reached_start);
442
443            trace!(
444                ?network_reached_start,
445                ?has_gaps,
446                ?first_chunk_is_definitive_head,
447                ?reached_start,
448                "finished handling network back-pagination"
449            );
450
451            reached_start
452        };
453
454        let backpagination_outcome = BackPaginationOutcome { events, reached_start };
455
456        if !event_diffs.is_empty() {
457            let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
458                diffs: event_diffs,
459                origin: EventsOrigin::Pagination,
460            });
461        }
462
463        Ok(backpagination_outcome)
464    }
465
466    /// Returns a subscriber to the pagination status used for the
467    /// back-pagination integrated to the event cache.
468    pub fn status(&self) -> Subscriber<RoomPaginationStatus> {
469        self.inner.pagination_status.subscribe()
470    }
471}
472
473/// Pagination token data, indicating in which state is the current pagination.
474#[derive(Clone, Debug, PartialEq)]
475pub enum PaginationToken {
476    /// We never had a pagination token, so we'll start back-paginating from the
477    /// end, or forward-paginating from the start.
478    None,
479    /// We paginated once before, and we received a prev/next batch token that
480    /// we may reuse for the next query.
481    HasMore(String),
482    /// We've hit one end of the timeline (either the start or the actual end),
483    /// so there's no need to continue paginating.
484    HitEnd,
485}
486
487impl From<Option<String>> for PaginationToken {
488    fn from(token: Option<String>) -> Self {
489        match token {
490            Some(val) => Self::HasMore(val),
491            None => Self::None,
492        }
493    }
494}