matrix_sdk/event_cache/
pagination.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sub-object for running pagination tasks on a given room.
16
17use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
18
19use eyeball::Subscriber;
20use matrix_sdk_base::timeout::timeout;
21use matrix_sdk_common::linked_chunk::ChunkContent;
22use tracing::{debug, instrument, trace};
23
24use super::{
25    paginator::{PaginationResult, PaginatorState},
26    room::{
27        events::{Gap, RoomEvents},
28        RoomEventCacheInner,
29    },
30    BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
31};
32
33/// An API object to run pagination queries on a [`super::RoomEventCache`].
34///
35/// Can be created with [`super::RoomEventCache::pagination()`].
36#[allow(missing_debug_implementations)]
37#[derive(Clone)]
38pub struct RoomPagination {
39    pub(super) inner: Arc<RoomEventCacheInner>,
40}
41
42impl RoomPagination {
43    /// Starts a back-pagination for the requested number of events.
44    ///
45    /// This automatically takes care of waiting for a pagination token from
46    /// sync, if we haven't done that before.
47    ///
48    /// The `until` argument is an async closure that returns a [`ControlFlow`]
49    /// to decide whether a new pagination must be run or not. It's helpful when
50    /// the server replies with e.g. a certain set of events, but we would like
51    /// more, or the event we are looking for isn't part of this set: in this
52    /// case, `until` returns [`ControlFlow::Continue`], otherwise it returns
53    /// [`ControlFlow::Break`]. `until` receives [`BackPaginationOutcome`] as
54    /// its sole argument.
55    ///
56    /// # Errors
57    ///
58    /// It may return an error if the pagination token used during
59    /// back-pagination has disappeared while we started the pagination. In
60    /// that case, it's desirable to call the method again.
61    ///
62    /// # Example
63    ///
64    /// To do a single run:
65    ///
66    /// ```rust
67    /// use std::ops::ControlFlow;
68    ///
69    /// use matrix_sdk::event_cache::{
70    ///     BackPaginationOutcome,
71    ///     RoomPagination,
72    ///     TimelineHasBeenResetWhilePaginating
73    /// };
74    ///
75    /// # async fn foo(room_pagination: RoomPagination) {
76    /// let result = room_pagination.run_backwards(
77    ///     42,
78    ///     |BackPaginationOutcome { events, reached_start },
79    ///      _timeline_has_been_reset: TimelineHasBeenResetWhilePaginating| async move {
80    ///         // Do something with `events` and `reached_start` maybe?
81    ///         let _ = events;
82    ///         let _ = reached_start;
83    ///
84    ///         ControlFlow::Break(())
85    ///     }
86    /// ).await;
87    /// # }
88    #[instrument(skip(self, until))]
89    pub async fn run_backwards<Until, Break, UntilFuture>(
90        &self,
91        batch_size: u16,
92        mut until: Until,
93    ) -> Result<Break>
94    where
95        Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture,
96        UntilFuture: Future<Output = ControlFlow<Break, ()>>,
97    {
98        let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
99
100        loop {
101            if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
102                match until(outcome, timeline_has_been_reset).await {
103                    ControlFlow::Continue(()) => {
104                        trace!("back-pagination continues");
105
106                        timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
107
108                        continue;
109                    }
110
111                    ControlFlow::Break(value) => return Ok(value),
112                }
113            }
114
115            timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes;
116
117            debug!("back-pagination has been internally restarted because of a timeline reset.");
118        }
119    }
120
121    async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
122        const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
123
124        let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
125
126        let prev_token = match prev_token {
127            PaginationToken::HasMore(token) => Some(token),
128            PaginationToken::None => None,
129            PaginationToken::HitEnd => {
130                debug!("Not back-paginating since we've reached the start of the timeline.");
131                return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
132            }
133        };
134
135        let paginator = &self.inner.paginator;
136
137        paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
138
139        // Run the actual pagination.
140        let PaginationResult { events, hit_end_of_timeline: reached_start } =
141            paginator.paginate_backward(batch_size.into()).await?;
142
143        // Make sure the `RoomEvents` isn't updated while we are saving events from
144        // backpagination.
145        let mut state = self.inner.state.write().await;
146
147        // Check that the previous token still exists; otherwise it's a sign that the
148        // room's timeline has been cleared.
149        let prev_gap_id = if let Some(token) = prev_token {
150            let gap_id = state.events().chunk_identifier(|chunk| {
151                matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
152            });
153
154            // We got a previous-batch token from the linked chunk *before* running the
155            // request, which is missing from the linked chunk *after*
156            // completing the request. It may be a sign the linked chunk has
157            // been reset, and it's an error in any case.
158            if gap_id.is_none() {
159                return Ok(None);
160            }
161
162            gap_id
163        } else {
164            None
165        };
166
167        // The new prev token from this pagination.
168        let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });
169
170        let (backpagination_outcome, sync_timeline_events_diffs) = state
171            .with_events_mut(move |room_events| {
172                // Note: The chunk could be empty.
173                //
174                // If there's any event, they are presented in reverse order (i.e. the first one
175                // should be prepended first).
176
177                let sync_events = events
178                    .iter()
179                    // Reverse the order of the events as `/messages` has been called with `dir=b`
180                    // (backward). The `RoomEvents` API expects the first event to be the oldest.
181                    .rev()
182                    .cloned()
183                    .collect::<Vec<_>>();
184
185                let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
186
187                // First, insert events.
188                let (added_unique_events, insert_new_gap_pos) = if let Some(gap_id) = prev_gap_id {
189                    // There is a prior gap, let's replace it by new events!
190                    trace!("replaced gap with new events from backpagination");
191                    room_events
192                        .replace_gap_at(sync_events.clone(), gap_id)
193                        .expect("gap_identifier is a valid chunk id we read previously")
194                } else if let Some(pos) = first_event_pos {
195                    // No prior gap, but we had some events: assume we need to prepend events
196                    // before those.
197                    trace!("inserted events before the first known event");
198                    let report = room_events
199                        .insert_events_at(sync_events.clone(), pos)
200                        .expect("pos is a valid position we just read above");
201                    (report, Some(pos))
202                } else {
203                    // No prior gap, and no prior events: push the events.
204                    trace!("pushing events received from back-pagination");
205                    let report = room_events.push_events(sync_events.clone());
206                    // A new gap may be inserted before the new events, if there are any.
207                    let next_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
208                    (report, next_pos)
209                };
210
211                // And insert the new gap if needs be.
212                //
213                // We only do this when at least one new, non-duplicated event, has been added
214                // to the chunk. Otherwise it means we've back-paginated all the
215                // known events.
216                if added_unique_events {
217                    if let Some(new_gap) = new_gap {
218                        if let Some(new_pos) = insert_new_gap_pos {
219                            room_events
220                                .insert_gap_at(new_gap, new_pos)
221                                .expect("events_chunk_pos represents a valid chunk position");
222                        } else {
223                            room_events.push_gap(new_gap);
224                        }
225                    }
226                } else {
227                    debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
228                }
229
230                room_events.on_new_events(&self.inner.room_version, sync_events.iter());
231
232                BackPaginationOutcome { events, reached_start }
233            })
234            .await?;
235
236        if !sync_timeline_events_diffs.is_empty() {
237            let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
238                diffs: sync_timeline_events_diffs,
239                origin: EventsOrigin::Pagination,
240            });
241        }
242
243        Ok(Some(backpagination_outcome))
244    }
245
246    /// Get the latest pagination token, as stored in the room events linked
247    /// list, or wait for it for the given amount of time.
248    ///
249    /// It will only wait if we *never* saw an initial previous-batch token.
250    /// Otherwise, it will immediately skip.
251    #[doc(hidden)]
252    pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> PaginationToken {
253        fn get_latest(events: &RoomEvents) -> Option<String> {
254            events.rchunks().find_map(|chunk| match chunk.content() {
255                ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
256                ChunkContent::Items(..) => None,
257            })
258        }
259
260        {
261            // Scope for the lock guard.
262            let state = self.inner.state.read().await;
263
264            // Check if the linked chunk contains any events. If so, absence of a gap means
265            // we've hit the start of the timeline. If not, absence of a gap
266            // means we've never received a pagination token from sync, and we
267            // should wait for one.
268            let has_events = state.events().events().next().is_some();
269
270            // Fast-path: we do have a previous-batch token already.
271            if let Some(found) = get_latest(state.events()) {
272                return PaginationToken::HasMore(found);
273            }
274
275            // If we had events, and there was no gap, then we've hit the end of the
276            // timeline.
277            if has_events {
278                return PaginationToken::HitEnd;
279            }
280
281            // If we've already waited for an initial previous-batch token before,
282            // immediately abort.
283            if state.waited_for_initial_prev_token {
284                return PaginationToken::None;
285            }
286        }
287
288        // If the caller didn't set a wait time, return none early.
289        let Some(wait_time) = wait_time else {
290            return PaginationToken::None;
291        };
292
293        // Otherwise, wait for a notification that we received a previous-batch token.
294        // Note the state lock is released while doing so, allowing other tasks to write
295        // into the linked chunk.
296        let _ = timeout(self.inner.pagination_batch_token_notifier.notified(), wait_time).await;
297
298        let mut state = self.inner.state.write().await;
299
300        state.waited_for_initial_prev_token = true;
301
302        if let Some(token) = get_latest(state.events()) {
303            PaginationToken::HasMore(token)
304        } else if state.events().events().next().is_some() {
305            // See logic above, in the read lock guard scope.
306            PaginationToken::HitEnd
307        } else {
308            PaginationToken::None
309        }
310    }
311
312    /// Returns a subscriber to the pagination status used for the
313    /// back-pagination integrated to the event cache.
314    pub fn status(&self) -> Subscriber<PaginatorState> {
315        self.inner.paginator.state()
316    }
317
318    /// Returns whether we've hit the start of the timeline.
319    ///
320    /// This is true if, and only if, we didn't have a previous-batch token and
321    /// running backwards pagination would be useless.
322    pub fn hit_timeline_start(&self) -> bool {
323        self.inner.paginator.hit_timeline_start()
324    }
325
326    /// Returns whether we've hit the end of the timeline.
327    ///
328    /// This is true if, and only if, we didn't have a next-batch token and
329    /// running forwards pagination would be useless.
330    pub fn hit_timeline_end(&self) -> bool {
331        self.inner.paginator.hit_timeline_end()
332    }
333}
334
335/// Pagination token data, indicating in which state is the current pagination.
336#[derive(Clone, Debug, PartialEq)]
337pub enum PaginationToken {
338    /// We never had a pagination token, so we'll start back-paginating from the
339    /// end, or forward-paginating from the start.
340    None,
341    /// We paginated once before, and we received a prev/next batch token that
342    /// we may reuse for the next query.
343    HasMore(String),
344    /// We've hit one end of the timeline (either the start or the actual end),
345    /// so there's no need to continue paginating.
346    HitEnd,
347}
348
349impl From<Option<String>> for PaginationToken {
350    fn from(token: Option<String>) -> Self {
351        match token {
352            Some(val) => Self::HasMore(val),
353            None => Self::None,
354        }
355    }
356}
357
358/// A type representing whether the timeline has been reset.
359#[derive(Debug)]
360pub enum TimelineHasBeenResetWhilePaginating {
361    /// The timeline has been reset.
362    Yes,
363
364    /// The timeline has not been reset.
365    No,
366}
367
368#[cfg(test)]
369mod tests {
370    // Those tests require time to work, and it does not on wasm32.
371    #[cfg(not(target_arch = "wasm32"))]
372    mod time_tests {
373        use std::time::{Duration, Instant};
374
375        use assert_matches::assert_matches;
376        use matrix_sdk_base::RoomState;
377        use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
378        use ruma::{event_id, room_id, user_id};
379        use tokio::{spawn, time::sleep};
380
381        use crate::{
382            event_cache::{pagination::PaginationToken, room::events::Gap},
383            test_utils::logged_in_client,
384        };
385
386        #[async_test]
387        async fn test_wait_no_pagination_token() {
388            let client = logged_in_client(None).await;
389            let room_id = room_id!("!galette:saucisse.bzh");
390            client.base_client().get_or_create_room(room_id, RoomState::Joined);
391
392            let event_cache = client.event_cache();
393
394            event_cache.subscribe().unwrap();
395
396            let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
397
398            let pagination = room_event_cache.pagination();
399
400            // If I have a room with no events, and try to get a pagination token without
401            // waiting,
402            let found = pagination.get_or_wait_for_token(None).await;
403            // Then I don't get any pagination token.
404            assert_matches!(found, PaginationToken::None);
405
406            // Reset waited_for_initial_prev_token and event state.
407            pagination.inner.state.write().await.reset().await.unwrap();
408
409            // If I wait for a back-pagination token for 0 seconds,
410            let before = Instant::now();
411            let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
412            let waited = before.elapsed();
413            // then I don't get any,
414            assert_matches!(found, PaginationToken::None);
415            // and I haven't waited long.
416            assert!(waited.as_secs() < 1);
417
418            // Reset waited_for_initial_prev_token state.
419            pagination.inner.state.write().await.reset().await.unwrap();
420
421            // If I wait for a back-pagination token for 1 second,
422            let before = Instant::now();
423            let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
424            let waited = before.elapsed();
425            // then I still don't get any.
426            assert_matches!(found, PaginationToken::None);
427            // and I've waited a bit.
428            assert!(waited.as_secs() < 2);
429            assert!(waited.as_secs() >= 1);
430        }
431
432        #[async_test]
433        async fn test_wait_hit_end_of_timeline() {
434            let client = logged_in_client(None).await;
435            let room_id = room_id!("!galette:saucisse.bzh");
436            client.base_client().get_or_create_room(room_id, RoomState::Joined);
437
438            let event_cache = client.event_cache();
439
440            event_cache.subscribe().unwrap();
441
442            let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
443
444            let f = EventFactory::new().room(room_id).sender(*ALICE);
445            let pagination = room_event_cache.pagination();
446
447            // Add a previous event.
448            room_event_cache
449                .inner
450                .state
451                .write()
452                .await
453                .with_events_mut(|events| {
454                    events.push_events([f
455                        .text_msg("this is the start of the timeline")
456                        .into_event()]);
457                })
458                .await
459                .unwrap();
460
461            // If I have a room with events, and try to get a pagination token without
462            // waiting,
463            let found = pagination.get_or_wait_for_token(None).await;
464            // I've reached the start of the timeline.
465            assert_matches!(found, PaginationToken::HitEnd);
466
467            // If I wait for a back-pagination token for 0 seconds,
468            let before = Instant::now();
469            let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
470            let waited = before.elapsed();
471            // Then I still have reached the start of the timeline.
472            assert_matches!(found, PaginationToken::HitEnd);
473            // and I've waited very little.
474            assert!(waited.as_secs() < 1);
475
476            // If I wait for a back-pagination token for 1 second,
477            let before = Instant::now();
478            let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
479            let waited = before.elapsed();
480            // then I still don't get any.
481            assert_matches!(found, PaginationToken::HitEnd);
482            // and I've waited very little (there's no point in waiting in this case).
483            assert!(waited.as_secs() < 1);
484
485            // Now, reset state. We'll add an event *after* we've started waiting, this
486            // time.
487            room_event_cache.clear().await.unwrap();
488
489            spawn(async move {
490                sleep(Duration::from_secs(1)).await;
491
492                room_event_cache
493                    .inner
494                    .state
495                    .write()
496                    .await
497                    .with_events_mut(|events| {
498                        events.push_events([f
499                            .text_msg("this is the start of the timeline")
500                            .into_event()]);
501                    })
502                    .await
503                    .unwrap();
504            });
505
506            // If I wait for a pagination token,
507            let before = Instant::now();
508            let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(2))).await;
509            let waited = before.elapsed();
510            // since sync has returned all events, and no prior gap, I've hit the end.
511            assert_matches!(found, PaginationToken::HitEnd);
512            // and I've waited for the whole duration.
513            assert!(waited.as_secs() >= 2);
514            assert!(waited.as_secs() < 3);
515        }
516
517        #[async_test]
518        async fn test_wait_for_pagination_token_already_present() {
519            let client = logged_in_client(None).await;
520            let room_id = room_id!("!galette:saucisse.bzh");
521            client.base_client().get_or_create_room(room_id, RoomState::Joined);
522
523            let event_cache = client.event_cache();
524
525            event_cache.subscribe().unwrap();
526
527            let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
528
529            let expected_token = "old".to_owned();
530
531            // When I have events and multiple gaps, in a room,
532            {
533                room_event_cache
534                    .inner
535                    .state
536                    .write()
537                    .await
538                    .with_events_mut(|room_events| {
539                        room_events.push_gap(Gap { prev_token: expected_token.clone() });
540                        room_events.push_events([EventFactory::new()
541                            .text_msg("yolo")
542                            .sender(user_id!("@b:z.h"))
543                            .event_id(event_id!("$ida"))
544                            .into_event()]);
545                    })
546                    .await
547                    .unwrap();
548            }
549
550            let pagination = room_event_cache.pagination();
551
552            // If I don't wait for a back-pagination token,
553            let found = pagination.get_or_wait_for_token(None).await;
554            // Then I get it.
555            assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
556
557            // If I wait for a back-pagination token for 0 seconds,
558            let before = Instant::now();
559            let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
560            let waited = before.elapsed();
561            // then I do get one.
562            assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
563            // and I haven't waited long.
564            assert!(waited.as_millis() < 100);
565
566            // If I wait for a back-pagination token for 1 second,
567            let before = Instant::now();
568            let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
569            let waited = before.elapsed();
570            // then I do get one.
571            assert_eq!(found, PaginationToken::HasMore(expected_token));
572            // and I haven't waited long.
573            assert!(waited.as_millis() < 100);
574        }
575
576        #[async_test]
577        async fn test_wait_for_late_pagination_token() {
578            let client = logged_in_client(None).await;
579            let room_id = room_id!("!galette:saucisse.bzh");
580            client.base_client().get_or_create_room(room_id, RoomState::Joined);
581
582            let event_cache = client.event_cache();
583
584            event_cache.subscribe().unwrap();
585
586            let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
587
588            let expected_token = "old".to_owned();
589
590            let before = Instant::now();
591            let cloned_expected_token = expected_token.clone();
592            let cloned_room_event_cache = room_event_cache.clone();
593            let insert_token_task = spawn(async move {
594                // If a backpagination token is inserted after 400 milliseconds,
595                sleep(Duration::from_millis(400)).await;
596
597                cloned_room_event_cache
598                    .inner
599                    .state
600                    .write()
601                    .await
602                    .with_events_mut(|events| {
603                        events.push_gap(Gap { prev_token: cloned_expected_token })
604                    })
605                    .await
606                    .unwrap();
607            });
608
609            let pagination = room_event_cache.pagination();
610
611            // Then first I don't get it (if I'm not waiting,)
612            let found = pagination.get_or_wait_for_token(None).await;
613            assert_matches!(found, PaginationToken::None);
614
615            // And if I wait for the back-pagination token for 600ms,
616            let found = pagination.get_or_wait_for_token(Some(Duration::from_millis(600))).await;
617            let waited = before.elapsed();
618
619            // then I do get one eventually.
620            assert_eq!(found, PaginationToken::HasMore(expected_token));
621            // and I have waited between ~400 and ~1000 milliseconds.
622            assert!(waited.as_secs() < 1);
623            assert!(waited.as_millis() >= 400);
624
625            // The task succeeded.
626            insert_token_task.await.unwrap();
627        }
628
629        #[async_test]
630        async fn test_get_latest_token() {
631            let client = logged_in_client(None).await;
632            let room_id = room_id!("!galette:saucisse.bzh");
633            client.base_client().get_or_create_room(room_id, RoomState::Joined);
634
635            let event_cache = client.event_cache();
636
637            event_cache.subscribe().unwrap();
638
639            let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
640
641            let old_token = "old".to_owned();
642            let new_token = "new".to_owned();
643
644            // Assuming a room event cache that contains both an old and a new pagination
645            // token, and events in between,
646            room_event_cache
647                .inner
648                .state
649                .write()
650                .await
651                .with_events_mut(|events| {
652                    let f = EventFactory::new().room(room_id).sender(*ALICE);
653
654                    // This simulates a valid representation of a room: first group of gap+events
655                    // were e.g. restored from the cache; second group of gap+events was received
656                    // from a subsequent sync.
657                    events.push_gap(Gap { prev_token: old_token });
658                    events.push_events([f.text_msg("oldest from cache").into()]);
659
660                    events.push_gap(Gap { prev_token: new_token.clone() });
661                    events.push_events([f.text_msg("sync'd gappy timeline").into()]);
662                })
663                .await
664                .unwrap();
665
666            let pagination = room_event_cache.pagination();
667
668            // Retrieving the pagination token will return the most recent one, not the old
669            // one.
670            let found = pagination.get_or_wait_for_token(None).await;
671            assert_eq!(found, PaginationToken::HasMore(new_token));
672        }
673    }
674}