matrix_sdk_ui/timeline/
subscriber.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19};
20
21use eyeball::Subscriber;
22use eyeball_im::{VectorDiff, VectorSubscriberBatchedStream};
23use eyeball_im_util::vector::{Skip, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use pin_project_lite::pin_project;
27
28use super::{TimelineDropHandle, controller::ObservableItems, item::TimelineItem};
29
30pin_project! {
31    /// A stream that wraps a [`TimelineDropHandle`] so that the `Timeline`
32    /// isn't dropped until the `Stream` is dropped.
33    pub(super) struct TimelineWithDropHandle<S> {
34        #[pin]
35        inner: S,
36        drop_handle: Arc<TimelineDropHandle>,
37    }
38}
39
40impl<S> TimelineWithDropHandle<S> {
41    /// Create a new [`WithTimelineDropHandle`].
42    pub(super) fn new(inner: S, drop_handle: Arc<TimelineDropHandle>) -> Self {
43        Self { inner, drop_handle }
44    }
45}
46
47impl<S> Stream for TimelineWithDropHandle<S>
48where
49    S: Stream,
50{
51    type Item = S::Item;
52
53    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        self.project().inner.poll_next(context)
55    }
56}
57
58pin_project! {
59    /// A type that creates a proper `Timeline` subscriber.
60    ///
61    /// This type implements [`Stream`], so that it's entirely transparent for
62    /// all consumers expecting an `impl Stream`.
63    ///
64    /// This `Stream` pipes `VectorDiff`s from [`ObservableItems`] into a batched
65    /// stream ([`VectorSubscriberBatchedStream`]), and then applies a skip
66    /// higher-order stream ([`Skip`]).
67    ///
68    /// `Skip` works by skipping the first _n_ values, where _n_ is referred
69    /// as `count`. Here, this `count` value is defined by a `Stream<Item =
70    /// usize>` (see [`Skip::dynamic_skip_with_initial_count`]). Everytime
71    /// the `count` stream produces a value, `Skip` adjusts its output.
72    /// `count` is managed by [`SkipCount`][skip::SkipCount], and is hold in
73    /// `TimelineMetadata::subscriber_skip_count`.
74    pub(super) struct TimelineSubscriber {
75        #[pin]
76        inner: Skip<VectorSubscriberBatchedStream<Arc<TimelineItem>>, Subscriber<usize>>,
77    }
78}
79
80impl TimelineSubscriber {
81    /// Creates a [`TimelineSubscriber`], in addition to the initial values of
82    /// the subscriber.
83    pub(super) fn new(
84        observable_items: &ObservableItems,
85        observable_skip_count: &skip::SkipCount,
86    ) -> (Vector<Arc<TimelineItem>>, Self) {
87        let (initial_values, stream) = observable_items
88            .subscribe()
89            .into_values_and_batched_stream()
90            .dynamic_skip_with_initial_count(
91                // The `SkipCount` value may have been modified before the subscriber is
92                // created. Let's use the current value instead of hardcoding it to 0.
93                observable_skip_count.get(),
94                observable_skip_count.subscribe(),
95            );
96
97        (initial_values, Self { inner: stream })
98    }
99}
100
101impl Stream for TimelineSubscriber {
102    type Item = Vec<VectorDiff<Arc<TimelineItem>>>;
103
104    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        self.project().inner.poll_next(context)
106    }
107}
108
109pub mod skip {
110    use eyeball::{SharedObservable, Subscriber};
111
112    const MAXIMUM_NUMBER_OF_INITIAL_ITEMS: usize = 20;
113
114    /// `SkipCount` helps to manage the `count` value used by the [`Skip`]
115    /// higher-order stream used by the [`TimelineSubscriber`]. See its
116    /// documentation to learn more.
117    ///
118    /// [`Skip`]: eyeball_im_util::vector::Skip
119    /// [`TimelineSubscriber`]: super::TimelineSubscriber
120    #[derive(Clone, Debug)]
121    pub struct SkipCount {
122        count: SharedObservable<usize>,
123    }
124
125    impl SkipCount {
126        /// Create a [`SkipCount`] with a default `count` value set to 0.
127        pub fn new() -> Self {
128            Self { count: SharedObservable::new(0) }
129        }
130
131        /// Compute the `count` value for [the `Skip` higher-order
132        /// stream][`Skip`].
133        ///
134        /// This is useful when new items are inserted, removed and so on.
135        ///
136        /// [`Skip`]: eyeball_im_util::vector::Skip
137        pub fn compute_next(
138            &self,
139            previous_number_of_items: usize,
140            next_number_of_items: usize,
141        ) -> usize {
142            let current_count = self.count.get();
143
144            // Initial states: no items are present.
145            if previous_number_of_items == 0 {
146                // Adjust the count to provide a maximum number of initial items. We want to
147                // skip the first items until we get a certain number of items to display.
148                //
149                // | `next_number_of_items` | `MAX…` | output | will display |
150                // |------------------------|--------|--------|--------------|
151                // | 60                     | 20     | 40     | 20 items     |
152                // | 10                     | 20     | 0      | 10 items     |
153                // | 0                      | 20     | 0      | 0 item       |
154                //
155                next_number_of_items.saturating_sub(MAXIMUM_NUMBER_OF_INITIAL_ITEMS)
156            }
157            // Not the initial state: there are items.
158            else {
159                // There are less items than before. Shift to the left `count` by the difference
160                // between `previous_number_of_items` and `next_number_of_items` to keep the
161                // same number of items in the stream as much as possible.
162                //
163                // This is not a backwards pagination, it cannot “go below 0”, however this is
164                // necessary to handle the case where the timeline is cleared and
165                // the number of items becomes 0 for example.
166                if next_number_of_items < previous_number_of_items {
167                    current_count.saturating_sub(previous_number_of_items - next_number_of_items)
168                }
169                // Return `current_count` with no modification, we don't want to adjust the
170                // count, we want to see all initial items and new items.
171                else {
172                    current_count
173                }
174            }
175        }
176
177        /// Compute the `count` value for [the `Skip` higher-order
178        /// stream][`Skip`] when a backwards pagination is happening.
179        ///
180        /// It returns the new value for `count` in addition to
181        /// `Some(number_of_items)` to fulfill the page up to `page_size`,
182        /// `None` otherwise. For example, assuming a `page_size` of 15,
183        /// if the `count` moves from 10 to 0, then 10 new items will
184        /// appear in the stream, but 5 are missing because they aren't
185        /// present in the stream: the stream has reached its beginning:
186        /// `Some(5)` will be returned. This is useful
187        /// for the pagination mechanism to fill the timeline with more items,
188        /// either from a storage, or from the network.
189        ///
190        /// [`Skip`]: eyeball_im_util::vector::Skip
191        pub fn compute_next_when_paginating_backwards(
192            &self,
193            page_size: usize,
194        ) -> (usize, Option<usize>) {
195            let current_count = self.count.get();
196
197            // We skip the values from the start of the timeline; paginating backwards means
198            // we have to reduce the count until reaching 0.
199            //
200            // | `current_count` | `page_size` | output         |
201            // |-----------------|-------------|----------------|
202            // | 50              | 20          | (30, None)     |
203            // | 30              | 20          | (10, None)     |
204            // | 10              | 20          | (0, Some(10))  |
205            // | 0               | 20          | (0, Some(20))  |
206            //                                    ^  ^^^^^^^^
207            //                                    |  |
208            //                                    |  it needs 20 items to fulfill the
209            //                                    |  page size
210            //                                    count becomes 0
211            //
212            if current_count >= page_size {
213                (current_count - page_size, None)
214            } else {
215                (0, Some(page_size - current_count))
216            }
217        }
218
219        /// Compute the `count` value for [the `Skip` higher-order
220        /// stream][`Skip`] when a forwards pagination is happening.
221        ///
222        /// The `page_size` is present to mimic the
223        /// [`compute_count_when_paginating_backwards`] function but it is
224        /// actually useless for the current implementation.
225        ///
226        /// [`Skip`]: eyeball_im_util::vector::Skip
227        #[allow(unused)] // this is not used yet because only a live timeline is using it, but as soon as
228        // other kind of timelines will use it, we would need it, it's better to have
229        // this in case of; everything is tested, the logic is made more robust.
230        pub fn compute_next_when_paginating_forwards(&self, _page_size: usize) -> usize {
231            // Nothing to do, the count remains unchanged as we skip the first values, not
232            // the last values; paginating forwards will add items at the end, not at the
233            // start of the timeline.
234            self.count.get()
235        }
236
237        /// Get the current count value.
238        pub fn get(&self) -> usize {
239            self.count.get()
240        }
241
242        /// Subscribe to updates of the count value.
243        pub fn subscribe(&self) -> Subscriber<usize> {
244            self.count.subscribe()
245        }
246
247        /// Update the skip count if and only if the timeline has a live focus
248        /// ([`TimelineFocusKind::Live`]).
249        pub fn update(&self, count: usize, is_live_focus: bool) {
250            if is_live_focus {
251                self.count.set_if_not_eq(count);
252            }
253        }
254    }
255
256    #[cfg(test)]
257    mod tests {
258        use super::SkipCount;
259
260        #[test]
261        fn test_compute_count_from_underflowing_initial_states() {
262            let skip_count = SkipCount::new();
263
264            // Initial state with too few new items. None is skipped.
265            let previous_number_of_items = 0;
266            let next_number_of_items = previous_number_of_items + 10;
267            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
268            assert_eq!(count, 0);
269            skip_count.count.set(count);
270
271            // Add 5 new items. The count stays at 0 because we don't want to skip the
272            // previous items.
273            let previous_number_of_items = next_number_of_items;
274            let next_number_of_items = previous_number_of_items + 5;
275            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
276            assert_eq!(count, 0);
277            skip_count.count.set(count);
278
279            // Add 20 new items. The count stays at 0 because we don't want to
280            // skip the previous items.
281            let previous_number_of_items = next_number_of_items;
282            let next_number_of_items = previous_number_of_items + 20;
283            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
284            assert_eq!(count, 0);
285            skip_count.count.set(count);
286
287            // Remove a certain number of items. The count stays at 0 because it was
288            // previously 0, no items are skipped, nothing to adjust.
289            let previous_number_of_items = next_number_of_items;
290            let next_number_of_items = previous_number_of_items - 4;
291            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
292            assert_eq!(count, 0);
293            skip_count.count.set(count);
294
295            // Remove all items. The count goes to 0 (regardless it was 0 before).
296            let previous_number_of_items = next_number_of_items;
297            let next_number_of_items = 0;
298            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
299            assert_eq!(count, 0);
300        }
301
302        #[test]
303        fn test_compute_count_from_overflowing_initial_states() {
304            let skip_count = SkipCount::new();
305
306            // Initial state with too much new items. Some are skipped.
307            let previous_number_of_items = 0;
308            let next_number_of_items = previous_number_of_items + 30;
309            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
310            assert_eq!(count, 10);
311            skip_count.count.set(count);
312
313            // Add 5 new items. The count stays at 10 because we don't want to skip the
314            // previous items.
315            let previous_number_of_items = next_number_of_items;
316            let next_number_of_items = previous_number_of_items + 5;
317            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
318            assert_eq!(count, 10);
319            skip_count.count.set(count);
320
321            // Add 20 new items. The count stays at 10 because we don't want to
322            // skip the previous items.
323            let previous_number_of_items = next_number_of_items;
324            let next_number_of_items = previous_number_of_items + 20;
325            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
326            assert_eq!(count, 10);
327            skip_count.count.set(count);
328
329            // Remove a certain number of items. The count is reduced by 5 so that the same
330            // number of items are presented.
331            let previous_number_of_items = next_number_of_items;
332            let next_number_of_items = previous_number_of_items - 4;
333            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
334            assert_eq!(count, 6);
335            skip_count.count.set(count);
336
337            // Remove all items. The count goes to 0 (regardless it was 6 before).
338            let previous_number_of_items = next_number_of_items;
339            let next_number_of_items = 0;
340            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
341            assert_eq!(count, 0);
342        }
343
344        #[test]
345        fn test_compute_count_when_paginating_backwards_from_underflowing_initial_states() {
346            let skip_count = SkipCount::new();
347
348            // Initial state with too few new items. None is skipped.
349            let previous_number_of_items = 0;
350            let next_number_of_items = previous_number_of_items + 10;
351            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
352            assert_eq!(count, 0);
353            skip_count.count.set(count);
354
355            // Add 30 new items. The count stays at 0 because we don't want to skip the
356            // previous items.
357            let previous_number_of_items = next_number_of_items;
358            let next_number_of_items = previous_number_of_items + 30;
359            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
360            assert_eq!(count, 0);
361            skip_count.count.set(count);
362
363            let page_size = 20;
364
365            // Paginate backwards.
366            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
367            assert_eq!(count, 0);
368            assert_eq!(needs, Some(20));
369        }
370
371        #[test]
372        fn test_compute_count_when_paginating_backwards_from_overflowing_initial_states() {
373            let skip_count = SkipCount::new();
374
375            // Initial state with too much new items. Some are skipped.
376            let previous_number_of_items = 0;
377            let next_number_of_items = previous_number_of_items + 50;
378            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
379            assert_eq!(count, 30);
380            skip_count.count.set(count);
381
382            // Add 30 new items. The count stays at 30 because we don't want to
383            // skip the previous items.
384            let previous_number_of_items = next_number_of_items;
385            let next_number_of_items = previous_number_of_items + 30;
386            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
387            assert_eq!(count, 30);
388            skip_count.count.set(count);
389
390            let page_size = 20;
391
392            // Paginate backwards. The count shifts by `page_size`, and the page is full.
393            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
394            assert_eq!(count, 10);
395            assert_eq!(needs, None);
396            skip_count.count.set(count);
397
398            // Paginate backwards. The count shifts by `page_size` but reaches 0 before the
399            // page becomes full. It needs 10 more items to fulfill the page.
400            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
401            assert_eq!(count, 0);
402            assert_eq!(needs, Some(10));
403        }
404
405        #[test]
406        fn test_compute_count_when_paginating_forwards_from_underflowing_initial_states() {
407            let skip_count = SkipCount::new();
408
409            // Initial state with too few new items. None is skipped.
410            let previous_number_of_items = 0;
411            let next_number_of_items = previous_number_of_items + 10;
412            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
413            assert_eq!(count, 0);
414            skip_count.count.set(count);
415
416            // Add 30 new items. The count stays at 0 because we don't want to skip the
417            // previous items.
418            let previous_number_of_items = next_number_of_items;
419            let next_number_of_items = previous_number_of_items + 30;
420            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
421            assert_eq!(count, 0);
422            skip_count.count.set(count);
423
424            let page_size = 20;
425
426            // Paginate forwards. The count remains unchanged.
427            let count = skip_count.compute_next_when_paginating_forwards(page_size);
428            assert_eq!(count, 0);
429        }
430
431        #[test]
432        fn test_compute_count_when_paginating_forwards_from_overflowing_initial_states() {
433            let skip_count = SkipCount::new();
434
435            // Initial state with too much new items. Some are skipped.
436            let previous_number_of_items = 0;
437            let next_number_of_items = previous_number_of_items + 50;
438            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
439            assert_eq!(count, 30);
440            skip_count.count.set(count);
441
442            // Add 30 new items. The count stays at 30 because we don't want to
443            // skip the previous items.
444            let previous_number_of_items = next_number_of_items;
445            let next_number_of_items = previous_number_of_items + 30;
446            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
447            assert_eq!(count, 30);
448            skip_count.count.set(count);
449
450            let page_size = 20;
451
452            // Paginate forwards. The count remains unchanged.
453            let count = skip_count.compute_next_when_paginating_forwards(page_size);
454            assert_eq!(count, 30);
455        }
456    }
457}