matrix_sdk_ui/timeline/
pagination.rs1use std::ops::ControlFlow;
16
17use async_rx::StreamExt as _;
18use async_stream::stream;
19use futures_core::Stream;
20use futures_util::{pin_mut, StreamExt as _};
21use matrix_sdk::event_cache::{
22 self,
23 paginator::{PaginatorError, PaginatorState},
24 BackPaginationOutcome, EventCacheError, RoomPagination,
25};
26use tracing::{instrument, trace, warn};
27
28use super::Error;
29
30impl super::Timeline {
31 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
35 pub async fn paginate_backwards(&self, num_events: u16) -> Result<bool, Error> {
36 if self.controller.is_live().await {
37 Ok(self.live_paginate_backwards(num_events).await?)
38 } else {
39 Ok(self.controller.focused_paginate_backwards(num_events).await?)
40 }
41 }
42
43 #[instrument(skip_all, fields(room_id = ?self.room().room_id()))]
47 pub async fn paginate_forwards(&self, num_events: u16) -> Result<bool, Error> {
48 if self.controller.is_live().await {
49 Ok(true)
50 } else {
51 Ok(self.controller.focused_paginate_forwards(num_events).await?)
52 }
53 }
54
55 async fn live_paginate_backwards(&self, batch_size: u16) -> event_cache::Result<bool> {
62 let pagination = self.event_cache.pagination();
63
64 let result = pagination
65 .run_backwards(
66 batch_size,
67 |BackPaginationOutcome { events, reached_start },
68 _timeline_has_been_reset| async move {
69 let num_events = events.len();
70 trace!("Back-pagination succeeded with {num_events} events");
71
72 if num_events == 0 && !reached_start {
73 return ControlFlow::Continue(());
77 }
78
79 ControlFlow::Break(reached_start)
80 },
81 )
82 .await;
83
84 match result {
85 Err(EventCacheError::BackpaginationError(PaginatorError::InvalidPreviousState {
86 actual: PaginatorState::Paginating,
87 ..
88 })) => {
89 warn!("Another pagination request is already happening, returning early");
90 Ok(false)
91 }
92
93 result => result,
94 }
95 }
96
97 pub async fn live_back_pagination_status(
104 &self,
105 ) -> Option<(LiveBackPaginationStatus, impl Stream<Item = LiveBackPaginationStatus>)> {
106 if !self.controller.is_live().await {
107 return None;
108 }
109
110 let pagination = self.event_cache.pagination();
111
112 let mut status = pagination.status();
113
114 let current_value =
115 LiveBackPaginationStatus::from_paginator_status(&pagination, status.next_now());
116
117 let stream = Box::pin(stream! {
118 let status_stream = status.dedup();
119
120 pin_mut!(status_stream);
121
122 while let Some(state) = status_stream.next().await {
123 yield LiveBackPaginationStatus::from_paginator_status(&pagination, state);
124 }
125 });
126
127 Some((current_value, stream))
128 }
129}
130
131#[derive(Debug, PartialEq)]
133#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
134pub enum LiveBackPaginationStatus {
135 Idle {
137 hit_start_of_timeline: bool,
140 },
141
142 Paginating,
144}
145
146impl LiveBackPaginationStatus {
147 fn from_paginator_status(pagination: &RoomPagination, state: PaginatorState) -> Self {
152 match state {
153 PaginatorState::Initial => Self::Idle { hit_start_of_timeline: false },
154 PaginatorState::FetchingTargetEvent => {
155 panic!("unexpected paginator state for a live backpagination")
156 }
157 PaginatorState::Idle => {
158 Self::Idle { hit_start_of_timeline: pagination.hit_timeline_start() }
159 }
160 PaginatorState::Paginating => Self::Paginating,
161 }
162 }
163}