matrix_sdk_ui/timeline/subscriber.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19};
20
21use eyeball::Subscriber;
22use eyeball_im::{VectorDiff, VectorSubscriberBatchedStream};
23use eyeball_im_util::vector::{Skip, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use pin_project_lite::pin_project;
27
28use super::{TimelineDropHandle, controller::ObservableItems, item::TimelineItem};
29
30pin_project! {
31 /// A stream that wraps a [`TimelineDropHandle`] so that the `Timeline`
32 /// isn't dropped until the `Stream` is dropped.
33 pub(super) struct TimelineWithDropHandle<S> {
34 #[pin]
35 inner: S,
36 drop_handle: Arc<TimelineDropHandle>,
37 }
38}
39
40impl<S> TimelineWithDropHandle<S> {
41 /// Create a new [`WithTimelineDropHandle`].
42 pub(super) fn new(inner: S, drop_handle: Arc<TimelineDropHandle>) -> Self {
43 Self { inner, drop_handle }
44 }
45}
46
47impl<S> Stream for TimelineWithDropHandle<S>
48where
49 S: Stream,
50{
51 type Item = S::Item;
52
53 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 self.project().inner.poll_next(context)
55 }
56}
57
58pin_project! {
59 /// A type that creates a proper `Timeline` subscriber.
60 ///
61 /// This type implements [`Stream`], so that it's entirely transparent for
62 /// all consumers expecting an `impl Stream`.
63 ///
64 /// This `Stream` pipes `VectorDiff`s from [`ObservableItems`] into a batched
65 /// stream ([`VectorSubscriberBatchedStream`]), and then applies a skip
66 /// higher-order stream ([`Skip`]).
67 ///
68 /// `Skip` works by skipping the first _n_ values, where _n_ is referred
69 /// as `count`. Here, this `count` value is defined by a `Stream<Item =
70 /// usize>` (see [`Skip::dynamic_skip_with_initial_count`]). Everytime
71 /// the `count` stream produces a value, `Skip` adjusts its output.
72 /// `count` is managed by [`SkipCount`][skip::SkipCount], and is hold in
73 /// `TimelineMetadata::subscriber_skip_count`.
74 pub(super) struct TimelineSubscriber {
75 #[pin]
76 inner: Skip<VectorSubscriberBatchedStream<Arc<TimelineItem>>, Subscriber<usize>>,
77 }
78}
79
80impl TimelineSubscriber {
81 /// Creates a [`TimelineSubscriber`], in addition to the initial values of
82 /// the subscriber.
83 pub(super) fn new(
84 observable_items: &ObservableItems,
85 observable_skip_count: &skip::SkipCount,
86 ) -> (Vector<Arc<TimelineItem>>, Self) {
87 let (initial_values, stream) = observable_items
88 .subscribe()
89 .into_values_and_batched_stream()
90 .dynamic_skip_with_initial_count(
91 // The `SkipCount` value may have been modified before the subscriber is
92 // created. Let's use the current value instead of hardcoding it to 0.
93 observable_skip_count.get(),
94 observable_skip_count.subscribe(),
95 );
96
97 (initial_values, Self { inner: stream })
98 }
99}
100
101impl Stream for TimelineSubscriber {
102 type Item = Vec<VectorDiff<Arc<TimelineItem>>>;
103
104 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 self.project().inner.poll_next(context)
106 }
107}
108
109pub mod skip {
110 use eyeball::{SharedObservable, Subscriber};
111
112 const MAXIMUM_NUMBER_OF_INITIAL_ITEMS: usize = 20;
113
114 /// `SkipCount` helps to manage the `count` value used by the [`Skip`]
115 /// higher-order stream used by the [`TimelineSubscriber`]. See its
116 /// documentation to learn more.
117 ///
118 /// [`Skip`]: eyeball_im_util::vector::Skip
119 /// [`TimelineSubscriber`]: super::TimelineSubscriber
120 #[derive(Clone, Debug)]
121 pub struct SkipCount {
122 count: SharedObservable<usize>,
123 }
124
125 impl SkipCount {
126 /// Create a [`SkipCount`] with a default `count` value set to 0.
127 pub fn new() -> Self {
128 Self { count: SharedObservable::new(0) }
129 }
130
131 /// Compute the `count` value for [the `Skip` higher-order
132 /// stream][`Skip`].
133 ///
134 /// This is useful when new items are inserted, removed and so on.
135 ///
136 /// [`Skip`]: eyeball_im_util::vector::Skip
137 pub fn compute_next(
138 &self,
139 previous_number_of_items: usize,
140 next_number_of_items: usize,
141 ) -> usize {
142 let current_count = self.count.get();
143
144 // Initial states: no items are present.
145 if previous_number_of_items == 0 {
146 // Adjust the count to provide a maximum number of initial items. We want to
147 // skip the first items until we get a certain number of items to display.
148 //
149 // | `next_number_of_items` | `MAX…` | output | will display |
150 // |------------------------|--------|--------|--------------|
151 // | 60 | 20 | 40 | 20 items |
152 // | 10 | 20 | 0 | 10 items |
153 // | 0 | 20 | 0 | 0 item |
154 //
155 next_number_of_items.saturating_sub(MAXIMUM_NUMBER_OF_INITIAL_ITEMS)
156 }
157 // Not the initial state: there are items.
158 else {
159 // There are less items than before. Shift to the left `count` by the difference
160 // between `previous_number_of_items` and `next_number_of_items` to keep the
161 // same number of items in the stream as much as possible.
162 //
163 // This is not a backwards pagination, it cannot “go below 0”, however this is
164 // necessary to handle the case where the timeline is cleared and
165 // the number of items becomes 0 for example.
166 if next_number_of_items < previous_number_of_items {
167 current_count.saturating_sub(previous_number_of_items - next_number_of_items)
168 }
169 // Return `current_count` with no modification, we don't want to adjust the
170 // count, we want to see all initial items and new items.
171 else {
172 current_count
173 }
174 }
175 }
176
177 /// Compute the `count` value for [the `Skip` higher-order
178 /// stream][`Skip`] when a backwards pagination is happening.
179 ///
180 /// It returns the new value for `count` in addition to
181 /// `Some(number_of_items)` to fulfill the page up to `page_size`,
182 /// `None` otherwise. For example, assuming a `page_size` of 15,
183 /// if the `count` moves from 10 to 0, then 10 new items will
184 /// appear in the stream, but 5 are missing because they aren't
185 /// present in the stream: the stream has reached its beginning:
186 /// `Some(5)` will be returned. This is useful
187 /// for the pagination mechanism to fill the timeline with more items,
188 /// either from a storage, or from the network.
189 ///
190 /// [`Skip`]: eyeball_im_util::vector::Skip
191 pub fn compute_next_when_paginating_backwards(
192 &self,
193 page_size: usize,
194 ) -> (usize, Option<usize>) {
195 let current_count = self.count.get();
196
197 // We skip the values from the start of the timeline; paginating backwards means
198 // we have to reduce the count until reaching 0.
199 //
200 // | `current_count` | `page_size` | output |
201 // |-----------------|-------------|----------------|
202 // | 50 | 20 | (30, None) |
203 // | 30 | 20 | (10, None) |
204 // | 10 | 20 | (0, Some(10)) |
205 // | 0 | 20 | (0, Some(20)) |
206 // ^ ^^^^^^^^
207 // | |
208 // | it needs 20 items to fulfill the
209 // | page size
210 // count becomes 0
211 //
212 if current_count >= page_size {
213 (current_count - page_size, None)
214 } else {
215 (0, Some(page_size - current_count))
216 }
217 }
218
219 /// Compute the `count` value for [the `Skip` higher-order
220 /// stream][`Skip`] when a forwards pagination is happening.
221 ///
222 /// The `page_size` is present to mimic the
223 /// [`compute_count_when_paginating_backwards`] function but it is
224 /// actually useless for the current implementation.
225 ///
226 /// [`Skip`]: eyeball_im_util::vector::Skip
227 #[allow(unused)] // this is not used yet because only a live timeline is using it, but as soon as
228 // other kind of timelines will use it, we would need it, it's better to have
229 // this in case of; everything is tested, the logic is made more robust.
230 pub fn compute_next_when_paginating_forwards(&self, _page_size: usize) -> usize {
231 // Nothing to do, the count remains unchanged as we skip the first values, not
232 // the last values; paginating forwards will add items at the end, not at the
233 // start of the timeline.
234 self.count.get()
235 }
236
237 /// Get the current count value.
238 pub fn get(&self) -> usize {
239 self.count.get()
240 }
241
242 /// Subscribe to updates of the count value.
243 pub fn subscribe(&self) -> Subscriber<usize> {
244 self.count.subscribe()
245 }
246
247 /// Update the skip count if and only if the timeline has a live focus
248 /// ([`TimelineFocusKind::Live`]).
249 pub fn update(&self, count: usize, is_live_focus: bool) {
250 if is_live_focus {
251 self.count.set_if_not_eq(count);
252 }
253 }
254 }
255
256 #[cfg(test)]
257 mod tests {
258 use super::SkipCount;
259
260 #[test]
261 fn test_compute_count_from_underflowing_initial_states() {
262 let skip_count = SkipCount::new();
263
264 // Initial state with too few new items. None is skipped.
265 let previous_number_of_items = 0;
266 let next_number_of_items = previous_number_of_items + 10;
267 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
268 assert_eq!(count, 0);
269 skip_count.count.set(count);
270
271 // Add 5 new items. The count stays at 0 because we don't want to skip the
272 // previous items.
273 let previous_number_of_items = next_number_of_items;
274 let next_number_of_items = previous_number_of_items + 5;
275 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
276 assert_eq!(count, 0);
277 skip_count.count.set(count);
278
279 // Add 20 new items. The count stays at 0 because we don't want to
280 // skip the previous items.
281 let previous_number_of_items = next_number_of_items;
282 let next_number_of_items = previous_number_of_items + 20;
283 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
284 assert_eq!(count, 0);
285 skip_count.count.set(count);
286
287 // Remove a certain number of items. The count stays at 0 because it was
288 // previously 0, no items are skipped, nothing to adjust.
289 let previous_number_of_items = next_number_of_items;
290 let next_number_of_items = previous_number_of_items - 4;
291 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
292 assert_eq!(count, 0);
293 skip_count.count.set(count);
294
295 // Remove all items. The count goes to 0 (regardless it was 0 before).
296 let previous_number_of_items = next_number_of_items;
297 let next_number_of_items = 0;
298 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
299 assert_eq!(count, 0);
300 }
301
302 #[test]
303 fn test_compute_count_from_overflowing_initial_states() {
304 let skip_count = SkipCount::new();
305
306 // Initial state with too much new items. Some are skipped.
307 let previous_number_of_items = 0;
308 let next_number_of_items = previous_number_of_items + 30;
309 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
310 assert_eq!(count, 10);
311 skip_count.count.set(count);
312
313 // Add 5 new items. The count stays at 10 because we don't want to skip the
314 // previous items.
315 let previous_number_of_items = next_number_of_items;
316 let next_number_of_items = previous_number_of_items + 5;
317 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
318 assert_eq!(count, 10);
319 skip_count.count.set(count);
320
321 // Add 20 new items. The count stays at 10 because we don't want to
322 // skip the previous items.
323 let previous_number_of_items = next_number_of_items;
324 let next_number_of_items = previous_number_of_items + 20;
325 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
326 assert_eq!(count, 10);
327 skip_count.count.set(count);
328
329 // Remove a certain number of items. The count is reduced by 5 so that the same
330 // number of items are presented.
331 let previous_number_of_items = next_number_of_items;
332 let next_number_of_items = previous_number_of_items - 4;
333 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
334 assert_eq!(count, 6);
335 skip_count.count.set(count);
336
337 // Remove all items. The count goes to 0 (regardless it was 6 before).
338 let previous_number_of_items = next_number_of_items;
339 let next_number_of_items = 0;
340 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
341 assert_eq!(count, 0);
342 }
343
344 #[test]
345 fn test_compute_count_when_paginating_backwards_from_underflowing_initial_states() {
346 let skip_count = SkipCount::new();
347
348 // Initial state with too few new items. None is skipped.
349 let previous_number_of_items = 0;
350 let next_number_of_items = previous_number_of_items + 10;
351 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
352 assert_eq!(count, 0);
353 skip_count.count.set(count);
354
355 // Add 30 new items. The count stays at 0 because we don't want to skip the
356 // previous items.
357 let previous_number_of_items = next_number_of_items;
358 let next_number_of_items = previous_number_of_items + 30;
359 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
360 assert_eq!(count, 0);
361 skip_count.count.set(count);
362
363 let page_size = 20;
364
365 // Paginate backwards.
366 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
367 assert_eq!(count, 0);
368 assert_eq!(needs, Some(20));
369 }
370
371 #[test]
372 fn test_compute_count_when_paginating_backwards_from_overflowing_initial_states() {
373 let skip_count = SkipCount::new();
374
375 // Initial state with too much new items. Some are skipped.
376 let previous_number_of_items = 0;
377 let next_number_of_items = previous_number_of_items + 50;
378 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
379 assert_eq!(count, 30);
380 skip_count.count.set(count);
381
382 // Add 30 new items. The count stays at 30 because we don't want to
383 // skip the previous items.
384 let previous_number_of_items = next_number_of_items;
385 let next_number_of_items = previous_number_of_items + 30;
386 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
387 assert_eq!(count, 30);
388 skip_count.count.set(count);
389
390 let page_size = 20;
391
392 // Paginate backwards. The count shifts by `page_size`, and the page is full.
393 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
394 assert_eq!(count, 10);
395 assert_eq!(needs, None);
396 skip_count.count.set(count);
397
398 // Paginate backwards. The count shifts by `page_size` but reaches 0 before the
399 // page becomes full. It needs 10 more items to fulfill the page.
400 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
401 assert_eq!(count, 0);
402 assert_eq!(needs, Some(10));
403 }
404
405 #[test]
406 fn test_compute_count_when_paginating_forwards_from_underflowing_initial_states() {
407 let skip_count = SkipCount::new();
408
409 // Initial state with too few new items. None is skipped.
410 let previous_number_of_items = 0;
411 let next_number_of_items = previous_number_of_items + 10;
412 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
413 assert_eq!(count, 0);
414 skip_count.count.set(count);
415
416 // Add 30 new items. The count stays at 0 because we don't want to skip the
417 // previous items.
418 let previous_number_of_items = next_number_of_items;
419 let next_number_of_items = previous_number_of_items + 30;
420 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
421 assert_eq!(count, 0);
422 skip_count.count.set(count);
423
424 let page_size = 20;
425
426 // Paginate forwards. The count remains unchanged.
427 let count = skip_count.compute_next_when_paginating_forwards(page_size);
428 assert_eq!(count, 0);
429 }
430
431 #[test]
432 fn test_compute_count_when_paginating_forwards_from_overflowing_initial_states() {
433 let skip_count = SkipCount::new();
434
435 // Initial state with too much new items. Some are skipped.
436 let previous_number_of_items = 0;
437 let next_number_of_items = previous_number_of_items + 50;
438 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
439 assert_eq!(count, 30);
440 skip_count.count.set(count);
441
442 // Add 30 new items. The count stays at 30 because we don't want to
443 // skip the previous items.
444 let previous_number_of_items = next_number_of_items;
445 let next_number_of_items = previous_number_of_items + 30;
446 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
447 assert_eq!(count, 30);
448 skip_count.count.set(count);
449
450 let page_size = 20;
451
452 // Paginate forwards. The count remains unchanged.
453 let count = skip_count.compute_next_when_paginating_forwards(page_size);
454 assert_eq!(count, 30);
455 }
456 }
457}