1use std::{future::Future, ops::ControlFlow, sync::Arc, time::Duration};
18
19use eyeball::Subscriber;
20use matrix_sdk_base::timeout::timeout;
21use matrix_sdk_common::linked_chunk::ChunkContent;
22use tracing::{debug, instrument, trace};
23
24use super::{
25 paginator::{PaginationResult, PaginatorState},
26 room::{
27 events::{Gap, RoomEvents},
28 RoomEventCacheInner,
29 },
30 BackPaginationOutcome, EventsOrigin, Result, RoomEventCacheUpdate,
31};
32
33#[allow(missing_debug_implementations)]
37#[derive(Clone)]
38pub struct RoomPagination {
39 pub(super) inner: Arc<RoomEventCacheInner>,
40}
41
42impl RoomPagination {
43 #[instrument(skip(self, until))]
89 pub async fn run_backwards<Until, Break, UntilFuture>(
90 &self,
91 batch_size: u16,
92 mut until: Until,
93 ) -> Result<Break>
94 where
95 Until: FnMut(BackPaginationOutcome, TimelineHasBeenResetWhilePaginating) -> UntilFuture,
96 UntilFuture: Future<Output = ControlFlow<Break, ()>>,
97 {
98 let mut timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
99
100 loop {
101 if let Some(outcome) = self.run_backwards_impl(batch_size).await? {
102 match until(outcome, timeline_has_been_reset).await {
103 ControlFlow::Continue(()) => {
104 trace!("back-pagination continues");
105
106 timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::No;
107
108 continue;
109 }
110
111 ControlFlow::Break(value) => return Ok(value),
112 }
113 }
114
115 timeline_has_been_reset = TimelineHasBeenResetWhilePaginating::Yes;
116
117 debug!("back-pagination has been internally restarted because of a timeline reset.");
118 }
119 }
120
121 async fn run_backwards_impl(&self, batch_size: u16) -> Result<Option<BackPaginationOutcome>> {
122 const DEFAULT_WAIT_FOR_TOKEN_DURATION: Duration = Duration::from_secs(3);
123
124 let prev_token = self.get_or_wait_for_token(Some(DEFAULT_WAIT_FOR_TOKEN_DURATION)).await;
125
126 let prev_token = match prev_token {
127 PaginationToken::HasMore(token) => Some(token),
128 PaginationToken::None => None,
129 PaginationToken::HitEnd => {
130 debug!("Not back-paginating since we've reached the start of the timeline.");
131 return Ok(Some(BackPaginationOutcome { reached_start: true, events: Vec::new() }));
132 }
133 };
134
135 let paginator = &self.inner.paginator;
136
137 paginator.set_idle_state(PaginatorState::Idle, prev_token.clone(), None)?;
138
139 let PaginationResult { events, hit_end_of_timeline: reached_start } =
141 paginator.paginate_backward(batch_size.into()).await?;
142
143 let mut state = self.inner.state.write().await;
146
147 let prev_gap_id = if let Some(token) = prev_token {
150 let gap_id = state.events().chunk_identifier(|chunk| {
151 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
152 });
153
154 if gap_id.is_none() {
159 return Ok(None);
160 }
161
162 gap_id
163 } else {
164 None
165 };
166
167 let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token });
169
170 let (backpagination_outcome, sync_timeline_events_diffs) = state
171 .with_events_mut(move |room_events| {
172 let sync_events = events
178 .iter()
179 .rev()
182 .cloned()
183 .collect::<Vec<_>>();
184
185 let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
186
187 let (added_unique_events, insert_new_gap_pos) = if let Some(gap_id) = prev_gap_id {
189 trace!("replaced gap with new events from backpagination");
191 room_events
192 .replace_gap_at(sync_events.clone(), gap_id)
193 .expect("gap_identifier is a valid chunk id we read previously")
194 } else if let Some(pos) = first_event_pos {
195 trace!("inserted events before the first known event");
198 let report = room_events
199 .insert_events_at(sync_events.clone(), pos)
200 .expect("pos is a valid position we just read above");
201 (report, Some(pos))
202 } else {
203 trace!("pushing events received from back-pagination");
205 let report = room_events.push_events(sync_events.clone());
206 let next_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
208 (report, next_pos)
209 };
210
211 if added_unique_events {
217 if let Some(new_gap) = new_gap {
218 if let Some(new_pos) = insert_new_gap_pos {
219 room_events
220 .insert_gap_at(new_gap, new_pos)
221 .expect("events_chunk_pos represents a valid chunk position");
222 } else {
223 room_events.push_gap(new_gap);
224 }
225 }
226 } else {
227 debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
228 }
229
230 room_events.on_new_events(&self.inner.room_version, sync_events.iter());
231
232 BackPaginationOutcome { events, reached_start }
233 })
234 .await?;
235
236 if !sync_timeline_events_diffs.is_empty() {
237 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
238 diffs: sync_timeline_events_diffs,
239 origin: EventsOrigin::Pagination,
240 });
241 }
242
243 Ok(Some(backpagination_outcome))
244 }
245
246 #[doc(hidden)]
252 pub async fn get_or_wait_for_token(&self, wait_time: Option<Duration>) -> PaginationToken {
253 fn get_latest(events: &RoomEvents) -> Option<String> {
254 events.rchunks().find_map(|chunk| match chunk.content() {
255 ChunkContent::Gap(gap) => Some(gap.prev_token.clone()),
256 ChunkContent::Items(..) => None,
257 })
258 }
259
260 {
261 let state = self.inner.state.read().await;
263
264 let has_events = state.events().events().next().is_some();
269
270 if let Some(found) = get_latest(state.events()) {
272 return PaginationToken::HasMore(found);
273 }
274
275 if has_events {
278 return PaginationToken::HitEnd;
279 }
280
281 if state.waited_for_initial_prev_token {
284 return PaginationToken::None;
285 }
286 }
287
288 let Some(wait_time) = wait_time else {
290 return PaginationToken::None;
291 };
292
293 let _ = timeout(self.inner.pagination_batch_token_notifier.notified(), wait_time).await;
297
298 let mut state = self.inner.state.write().await;
299
300 state.waited_for_initial_prev_token = true;
301
302 if let Some(token) = get_latest(state.events()) {
303 PaginationToken::HasMore(token)
304 } else if state.events().events().next().is_some() {
305 PaginationToken::HitEnd
307 } else {
308 PaginationToken::None
309 }
310 }
311
312 pub fn status(&self) -> Subscriber<PaginatorState> {
315 self.inner.paginator.state()
316 }
317
318 pub fn hit_timeline_start(&self) -> bool {
323 self.inner.paginator.hit_timeline_start()
324 }
325
326 pub fn hit_timeline_end(&self) -> bool {
331 self.inner.paginator.hit_timeline_end()
332 }
333}
334
335#[derive(Clone, Debug, PartialEq)]
337pub enum PaginationToken {
338 None,
341 HasMore(String),
344 HitEnd,
347}
348
349impl From<Option<String>> for PaginationToken {
350 fn from(token: Option<String>) -> Self {
351 match token {
352 Some(val) => Self::HasMore(val),
353 None => Self::None,
354 }
355 }
356}
357
358#[derive(Debug)]
360pub enum TimelineHasBeenResetWhilePaginating {
361 Yes,
363
364 No,
366}
367
368#[cfg(test)]
369mod tests {
370 #[cfg(not(target_arch = "wasm32"))]
372 mod time_tests {
373 use std::time::{Duration, Instant};
374
375 use assert_matches::assert_matches;
376 use matrix_sdk_base::RoomState;
377 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
378 use ruma::{event_id, room_id, user_id};
379 use tokio::{spawn, time::sleep};
380
381 use crate::{
382 event_cache::{pagination::PaginationToken, room::events::Gap},
383 test_utils::logged_in_client,
384 };
385
386 #[async_test]
387 async fn test_wait_no_pagination_token() {
388 let client = logged_in_client(None).await;
389 let room_id = room_id!("!galette:saucisse.bzh");
390 client.base_client().get_or_create_room(room_id, RoomState::Joined);
391
392 let event_cache = client.event_cache();
393
394 event_cache.subscribe().unwrap();
395
396 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
397
398 let pagination = room_event_cache.pagination();
399
400 let found = pagination.get_or_wait_for_token(None).await;
403 assert_matches!(found, PaginationToken::None);
405
406 pagination.inner.state.write().await.reset().await.unwrap();
408
409 let before = Instant::now();
411 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
412 let waited = before.elapsed();
413 assert_matches!(found, PaginationToken::None);
415 assert!(waited.as_secs() < 1);
417
418 pagination.inner.state.write().await.reset().await.unwrap();
420
421 let before = Instant::now();
423 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
424 let waited = before.elapsed();
425 assert_matches!(found, PaginationToken::None);
427 assert!(waited.as_secs() < 2);
429 assert!(waited.as_secs() >= 1);
430 }
431
432 #[async_test]
433 async fn test_wait_hit_end_of_timeline() {
434 let client = logged_in_client(None).await;
435 let room_id = room_id!("!galette:saucisse.bzh");
436 client.base_client().get_or_create_room(room_id, RoomState::Joined);
437
438 let event_cache = client.event_cache();
439
440 event_cache.subscribe().unwrap();
441
442 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
443
444 let f = EventFactory::new().room(room_id).sender(*ALICE);
445 let pagination = room_event_cache.pagination();
446
447 room_event_cache
449 .inner
450 .state
451 .write()
452 .await
453 .with_events_mut(|events| {
454 events.push_events([f
455 .text_msg("this is the start of the timeline")
456 .into_event()]);
457 })
458 .await
459 .unwrap();
460
461 let found = pagination.get_or_wait_for_token(None).await;
464 assert_matches!(found, PaginationToken::HitEnd);
466
467 let before = Instant::now();
469 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
470 let waited = before.elapsed();
471 assert_matches!(found, PaginationToken::HitEnd);
473 assert!(waited.as_secs() < 1);
475
476 let before = Instant::now();
478 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
479 let waited = before.elapsed();
480 assert_matches!(found, PaginationToken::HitEnd);
482 assert!(waited.as_secs() < 1);
484
485 room_event_cache.clear().await.unwrap();
488
489 spawn(async move {
490 sleep(Duration::from_secs(1)).await;
491
492 room_event_cache
493 .inner
494 .state
495 .write()
496 .await
497 .with_events_mut(|events| {
498 events.push_events([f
499 .text_msg("this is the start of the timeline")
500 .into_event()]);
501 })
502 .await
503 .unwrap();
504 });
505
506 let before = Instant::now();
508 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(2))).await;
509 let waited = before.elapsed();
510 assert_matches!(found, PaginationToken::HitEnd);
512 assert!(waited.as_secs() >= 2);
514 assert!(waited.as_secs() < 3);
515 }
516
517 #[async_test]
518 async fn test_wait_for_pagination_token_already_present() {
519 let client = logged_in_client(None).await;
520 let room_id = room_id!("!galette:saucisse.bzh");
521 client.base_client().get_or_create_room(room_id, RoomState::Joined);
522
523 let event_cache = client.event_cache();
524
525 event_cache.subscribe().unwrap();
526
527 let (room_event_cache, _drop_handlers) = event_cache.for_room(room_id).await.unwrap();
528
529 let expected_token = "old".to_owned();
530
531 {
533 room_event_cache
534 .inner
535 .state
536 .write()
537 .await
538 .with_events_mut(|room_events| {
539 room_events.push_gap(Gap { prev_token: expected_token.clone() });
540 room_events.push_events([EventFactory::new()
541 .text_msg("yolo")
542 .sender(user_id!("@b:z.h"))
543 .event_id(event_id!("$ida"))
544 .into_event()]);
545 })
546 .await
547 .unwrap();
548 }
549
550 let pagination = room_event_cache.pagination();
551
552 let found = pagination.get_or_wait_for_token(None).await;
554 assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
556
557 let before = Instant::now();
559 let found = pagination.get_or_wait_for_token(Some(Duration::default())).await;
560 let waited = before.elapsed();
561 assert_eq!(found, PaginationToken::HasMore(expected_token.clone()));
563 assert!(waited.as_millis() < 100);
565
566 let before = Instant::now();
568 let found = pagination.get_or_wait_for_token(Some(Duration::from_secs(1))).await;
569 let waited = before.elapsed();
570 assert_eq!(found, PaginationToken::HasMore(expected_token));
572 assert!(waited.as_millis() < 100);
574 }
575
576 #[async_test]
577 async fn test_wait_for_late_pagination_token() {
578 let client = logged_in_client(None).await;
579 let room_id = room_id!("!galette:saucisse.bzh");
580 client.base_client().get_or_create_room(room_id, RoomState::Joined);
581
582 let event_cache = client.event_cache();
583
584 event_cache.subscribe().unwrap();
585
586 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
587
588 let expected_token = "old".to_owned();
589
590 let before = Instant::now();
591 let cloned_expected_token = expected_token.clone();
592 let cloned_room_event_cache = room_event_cache.clone();
593 let insert_token_task = spawn(async move {
594 sleep(Duration::from_millis(400)).await;
596
597 cloned_room_event_cache
598 .inner
599 .state
600 .write()
601 .await
602 .with_events_mut(|events| {
603 events.push_gap(Gap { prev_token: cloned_expected_token })
604 })
605 .await
606 .unwrap();
607 });
608
609 let pagination = room_event_cache.pagination();
610
611 let found = pagination.get_or_wait_for_token(None).await;
613 assert_matches!(found, PaginationToken::None);
614
615 let found = pagination.get_or_wait_for_token(Some(Duration::from_millis(600))).await;
617 let waited = before.elapsed();
618
619 assert_eq!(found, PaginationToken::HasMore(expected_token));
621 assert!(waited.as_secs() < 1);
623 assert!(waited.as_millis() >= 400);
624
625 insert_token_task.await.unwrap();
627 }
628
629 #[async_test]
630 async fn test_get_latest_token() {
631 let client = logged_in_client(None).await;
632 let room_id = room_id!("!galette:saucisse.bzh");
633 client.base_client().get_or_create_room(room_id, RoomState::Joined);
634
635 let event_cache = client.event_cache();
636
637 event_cache.subscribe().unwrap();
638
639 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
640
641 let old_token = "old".to_owned();
642 let new_token = "new".to_owned();
643
644 room_event_cache
647 .inner
648 .state
649 .write()
650 .await
651 .with_events_mut(|events| {
652 let f = EventFactory::new().room(room_id).sender(*ALICE);
653
654 events.push_gap(Gap { prev_token: old_token });
658 events.push_events([f.text_msg("oldest from cache").into()]);
659
660 events.push_gap(Gap { prev_token: new_token.clone() });
661 events.push_events([f.text_msg("sync'd gappy timeline").into()]);
662 })
663 .await
664 .unwrap();
665
666 let pagination = room_event_cache.pagination();
667
668 let found = pagination.get_or_wait_for_token(None).await;
671 assert_eq!(found, PaginationToken::HasMore(new_token));
672 }
673 }
674}