eyeball_im_util/vector/
skip.rs

1use smallvec::SmallVec;
2use std::{
3    cmp::{min, Ordering},
4    iter::repeat,
5    pin::Pin,
6    task::{self, ready, Poll},
7};
8
9use super::{
10    VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamElement,
11    VectorDiffContainerStreamSkipBuf, VectorObserver,
12};
13use eyeball_im::VectorDiff;
14use futures_core::Stream;
15use imbl::Vector;
16use pin_project_lite::pin_project;
17
18pin_project! {
19    /// A [`VectorDiff`] stream adapter that presents a limited view of the
20    /// underlying [`ObservableVector`]s items. The view starts after `count`
21    /// number of values are skipped, until the end. It must not be confused
22    /// with [`Tail`](super::Tail) where `Tail` keeps the last values, while
23    /// `Skip` skips the first values.
24    ///
25    /// For example, let `S` be a `Stream<Item = VectorDiff>`. The [`Vector`]
26    /// represented by `S` can have any length, but one may want to virtually
27    /// skip the first `count` values. Then this `Skip` adapter is appropriate.
28    ///
29    /// An internal buffered vector is kept so that the adapter knows which
30    /// values can be added when the index is decreased, or when values are
31    /// removed and new values must be inserted. This fact is important if the
32    /// items of the `Vector` have a non-negligible size.
33    ///
34    /// It's okay to have an index larger than the length of the observed
35    /// `Vector`.
36    ///
37    /// # Examples
38    ///
39    /// ```rust
40    /// use eyeball_im::{ObservableVector, VectorDiff};
41    /// use eyeball_im_util::vector::VectorObserverExt;
42    /// use imbl::vector;
43    /// use stream_assert::{assert_closed, assert_next_eq, assert_pending};
44    ///
45    /// // Our vector.
46    /// let mut ob = ObservableVector::<char>::new();
47    /// let (values, mut sub) = ob.subscribe().skip(3);
48    ///
49    /// assert!(values.is_empty());
50    /// assert_pending!(sub);
51    ///
52    /// // Append multiple values.
53    /// ob.append(vector!['a', 'b', 'c', 'd', 'e']);
54    /// // We get a `VectorDiff::Append` with the latest 2 values because the
55    /// // first 3 values have been skipped!
56    /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['d', 'e'] });
57    ///
58    /// // Let's recap what we have. `ob` is our `ObservableVector`,
59    /// // `sub` is the “limited view” of `ob`:
60    /// // | `ob`  | a b c d e |
61    /// // | `sub` | _ _ _ d e |
62    /// // “_” means the item has been skipped.
63    ///
64    /// // Append multiple values.
65    /// ob.append(vector!['f', 'g']);
66    /// // We get a single `VectorDiff`.
67    /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['f', 'g'] });
68    ///
69    /// // Let's recap what we have:
70    /// // | `ob`  | a b c d e f g |
71    /// // | `sub` | _ _ _ d e f g |
72    /// //                     ^^^
73    /// //                     |
74    /// //                     `VectorDiff::Append { .. }`
75    ///
76    /// // Insert a single value.
77    /// ob.insert(1, 'h');
78    /// // We get a single `VectorDiff::PushFront`. Indeed, `h` is inserted at
79    /// // index 1, so every value after that is shifted to the right, thus `c`
80    /// // “enters the view” via a `PushFront`.
81    /// assert_next_eq!(sub, VectorDiff::PushFront { value: 'c' });
82    ///
83    /// // Let's recap what we have:
84    /// // | `ob`  | a h b c d e f g |
85    /// // | `sub` | _ _ _ c d e f g |
86    /// //                 ^
87    /// //                 |
88    /// //                 `VectorDiff::PushFront { .. }`
89    ///
90    /// assert_pending!(sub);
91    /// drop(ob);
92    /// assert_closed!(sub);
93    /// ```
94    ///
95    /// [`ObservableVector`]: eyeball_im::ObservableVector
96    #[project = SkipProj]
97    pub struct Skip<S, C>
98    where
99        S: Stream,
100        S::Item: VectorDiffContainer,
101    {
102        // The main stream to poll items from.
103        #[pin]
104        inner_stream: S,
105
106        // The count stream to poll new count values from.
107        #[pin]
108        count_stream: C,
109
110        // The buffered vector that is updated with the main stream's items.
111        // It's used to provide missing items, e.g. when the count decreases or
112        // when values must be filled.
113        buffered_vector: Vector<VectorDiffContainerStreamElement<S>>,
114
115        // The current count.
116        //
117        // This is an option because it can be uninitialized. It's incorrect
118        // to use a default value for `count`, contrary to [`Head`] or [`Tail`]
119        // where `limit` can be 0.
120        count: Option<usize>,
121
122        // This adapter is not a basic filter: It can produce many items per
123        // item of the underlying stream.
124        //
125        // Thus, if the item type is just `VectorDiff<_>` (non-bached, can't
126        // just add diffs to a poll_next result), we need a buffer to store the
127        // possible extra item in.
128        ready_values: VectorDiffContainerStreamSkipBuf<S>,
129    }
130}
131
132impl<S> Skip<S, EmptyCountStream>
133where
134    S: Stream,
135    S::Item: VectorDiffContainer,
136{
137    /// Create a new [`Skip`] with the given (unlimited) initial values,
138    /// stream of `VectorDiff` updates for those values, and a fixed count.
139    ///
140    /// Returns the initial values as well as a stream of updates that ensure
141    /// that the resulting vector never includes the first `count` items.
142    pub fn new(
143        initial_values: Vector<VectorDiffContainerStreamElement<S>>,
144        inner_stream: S,
145        count: usize,
146    ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
147        Self::dynamic_with_initial_count(initial_values, inner_stream, count, EmptyCountStream)
148    }
149}
150
151impl<S, C> Skip<S, C>
152where
153    S: Stream,
154    S::Item: VectorDiffContainer,
155    C: Stream<Item = usize>,
156{
157    /// Create a new [`Skip`] with the given (unlimited) initial values,
158    /// stream of `VectorDiff` updates for those values, and a stream of
159    /// indices.
160    ///
161    /// This is equivalent to `dynamic_with_initial_count` where the
162    /// `initial_count` is `usize::MAX`, except that it doesn't return the
163    /// limited vector as it would be empty anyways.
164    ///
165    /// Note that the returned `Skip` won't produce anything until the first
166    /// count is produced by the index stream.
167    pub fn dynamic(
168        initial_values: Vector<VectorDiffContainerStreamElement<S>>,
169        inner_stream: S,
170        count_stream: C,
171    ) -> Self {
172        Self {
173            inner_stream,
174            count_stream,
175            buffered_vector: initial_values,
176            count: None,
177            ready_values: Default::default(),
178        }
179    }
180
181    /// Create a new [`Skip`] with the given (unlimited) initial values,
182    /// stream of `VectorDiff` updates for those values, and an initial
183    /// count as well as a stream of new count values.
184    pub fn dynamic_with_initial_count(
185        initial_values: Vector<VectorDiffContainerStreamElement<S>>,
186        inner_stream: S,
187        initial_count: usize,
188        count_stream: C,
189    ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
190        let buffered_vector = initial_values.clone();
191
192        let initial_values = initial_values.skip(initial_count);
193
194        let stream = Self {
195            inner_stream,
196            count_stream,
197            buffered_vector,
198            count: Some(initial_count),
199            ready_values: Default::default(),
200        };
201
202        (initial_values, stream)
203    }
204}
205
206impl<S, C> Stream for Skip<S, C>
207where
208    S: Stream,
209    S::Item: VectorDiffContainer,
210    C: Stream<Item = usize>,
211{
212    type Item = S::Item;
213
214    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
215        self.project().poll_next(cx)
216    }
217}
218
219impl<S, C> VectorObserver<VectorDiffContainerStreamElement<S>> for Skip<S, C>
220where
221    S: Stream,
222    S::Item: VectorDiffContainer,
223    C: Stream<Item = usize>,
224{
225    type Stream = Self;
226
227    fn into_parts(self) -> (Vector<VectorDiffContainerStreamElement<S>>, Self::Stream) {
228        (self.buffered_vector.clone(), self)
229    }
230}
231
232impl<S, C> SkipProj<'_, S, C>
233where
234    S: Stream,
235    S::Item: VectorDiffContainer,
236    C: Stream<Item = usize>,
237{
238    fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<S::Item>> {
239        loop {
240            // First off, if any values are ready, return them.
241            if let Some(value) = S::Item::pop_from_skip_buf(self.ready_values) {
242                return Poll::Ready(Some(value));
243            }
244
245            // Poll a new count value from `count_stream` before polling `inner_stream`.
246            while let Poll::Ready(Some(next_count)) = self.count_stream.as_mut().poll_next(cx) {
247                // Update the count value and emit `VectorDiff`s accordingly.
248                if let Some(diffs) = self.update_count(next_count) {
249                    return Poll::Ready(S::Item::extend_skip_buf(diffs, self.ready_values));
250                }
251
252                // If `update_count` returned `None`, poll the count stream
253                // again.
254            }
255
256            // Poll `VectorDiff`s from the `inner_stream`.
257            let Some(diffs) = ready!(self.inner_stream.as_mut().poll_next(cx)) else {
258                return Poll::Ready(None);
259            };
260
261            // Consume and apply the diffs if possible.
262            let ready = diffs.push_into_skip_buf(self.ready_values, |diff| {
263                let count = *self.count;
264                let previous_length = self.buffered_vector.len();
265
266                // Update the `buffered_vector`. It's a replica of the original observed
267                // `Vector`. We need to maintain it in order to be able to produce valid
268                // `VectorDiff`s when items are missing.
269                diff.clone().apply(self.buffered_vector);
270
271                // Handle the `diff` if and only if there is a count.
272                if let Some(count) = count {
273                    handle_diff(diff, count, previous_length, self.buffered_vector)
274                } else {
275                    SmallVec::new()
276                }
277            });
278
279            if let Some(diff) = ready {
280                return Poll::Ready(Some(diff));
281            }
282
283            // Else loop and poll the streams again.
284        }
285    }
286
287    /// Update the count value if necessary.
288    ///
289    /// * If the buffered vector is empty, it returns `None`.
290    /// * If the count decreases, `VectorDiff::PushFront`s are produced if any
291    ///   items exist.
292    /// * If the count increases, `VectorDiff::PopFront`s are produced.
293    ///
294    /// It's OK to have a `new_count` larger than the length of the `Vector`.
295    /// The `new_count` won't be capped.
296    fn update_count(
297        &mut self,
298        new_count: usize,
299    ) -> Option<Vec<VectorDiff<VectorDiffContainerStreamElement<S>>>> {
300        // Let's update the count.
301        let old_count = self.count.replace(new_count);
302
303        if self.buffered_vector.is_empty() {
304            // If empty, nothing to do.
305            return None;
306        }
307
308        let old_count = match old_count {
309            // First time `count` is initialized.
310            None => {
311                return Some(vec![VectorDiff::Append {
312                    values: self.buffered_vector.clone().skip(new_count),
313                }])
314            }
315
316            // Other updates of `count`.
317            Some(old_count) => old_count,
318        };
319
320        // Clamp `old_count` and `new_count` to the length of `buffered_vector` in case
321        // it is larger.
322        let buffered_vector_length = self.buffered_vector.len();
323        let old_count = min(old_count, buffered_vector_length);
324        let new_count = min(new_count, buffered_vector_length);
325
326        match old_count.cmp(&new_count) {
327            // old < new, count is shifting to the right
328            Ordering::Less => {
329                // Skip more items than the buffer contains: we must clear all existing items.
330                if buffered_vector_length <= new_count {
331                    Some(vec![VectorDiff::Clear])
332                } else {
333                    // Let's remove the extra items.
334                    Some(repeat(VectorDiff::PopFront).take(new_count - old_count).collect())
335                }
336            }
337
338            // old > new, count is shifting to the left
339            Ordering::Greater => {
340                // Optimisation: when `old_count` is at the end of `buffered_vector`, and
341                // `new_count` is zero, we can emit a single `VectorDiff::Append` instead of
342                // many `VectorDiff::PushFront`.
343                if old_count == buffered_vector_length && new_count == 0 {
344                    Some(vec![VectorDiff::Append { values: self.buffered_vector.clone() }])
345                } else {
346                    let mut missing_items = self
347                        .buffered_vector
348                        .iter()
349                        .rev()
350                        .skip(buffered_vector_length - old_count)
351                        .take(old_count - new_count)
352                        .cloned()
353                        .peekable();
354
355                    if missing_items.peek().is_none() {
356                        None
357                    } else {
358                        // Let's add the missing items.
359                        Some(
360                            missing_items
361                                .map(|missing_item| VectorDiff::PushFront { value: missing_item })
362                                .collect(),
363                        )
364                    }
365                }
366            }
367
368            // old == new
369            Ordering::Equal => {
370                // Nothing to do.
371                None
372            }
373        }
374    }
375}
376
377fn handle_diff<T: Clone>(
378    diff: VectorDiff<T>,
379    count: usize,
380    previous_length: usize,
381    buffered_vector: &Vector<T>,
382) -> SmallVec<[VectorDiff<T>; 2]> {
383    let mut res = SmallVec::new();
384
385    match diff {
386        VectorDiff::Append { values } => {
387            // Some values are appended after `count`.
388            if buffered_vector.len() > count {
389                // Cut `values` if they aren't all appended after `count`.
390                //
391                // Note: Do a `<` instead of `<=` to avoid calling `skip` with 0.
392
393                let values = if previous_length < count {
394                    values.skip(count - previous_length)
395                } else {
396                    values
397                };
398
399                res.push(VectorDiff::Append { values });
400            }
401        }
402
403        VectorDiff::Clear => {
404            res.push(VectorDiff::Clear);
405        }
406
407        VectorDiff::PushFront { value } => {
408            // The push shifts values after `count`.
409            if previous_length >= count {
410                // The value at `count` is the one that must be pushed front.
411                //
412                // If `count` is zero, let's avoid a look up + clone by forwarding `value`.
413                // Otherwise, let's look up at `count`.
414                if count == 0 {
415                    res.push(VectorDiff::PushFront { value });
416                } else if let Some(value) = buffered_vector.get(count) {
417                    res.push(VectorDiff::PushFront { value: value.clone() });
418                }
419            }
420        }
421
422        VectorDiff::PushBack { value } => {
423            // The push happens after `count`.
424            if previous_length >= count {
425                res.push(VectorDiff::PushBack { value });
426            }
427        }
428
429        VectorDiff::PopFront => {
430            // The pop shifts values after `count`.
431            if previous_length > count {
432                res.push(VectorDiff::PopFront);
433            }
434        }
435
436        VectorDiff::PopBack => {
437            // The pop happens after `count`.
438            if previous_length > count {
439                res.push(VectorDiff::PopBack);
440            }
441        }
442
443        VectorDiff::Insert { index, value } => {
444            // The insert shifts values after `count`.
445            if previous_length >= count {
446                // The insert happens before `count`, we need to insert a new
447                // value with `PushFront`.
448                if count > 0 && index < count {
449                    // The value at `count` is the one that must be
450                    // pushed front.
451                    if let Some(value) = buffered_vector.get(count) {
452                        res.push(VectorDiff::PushFront { value: value.clone() });
453                    }
454                }
455                // The insert happens after `count`, we need to re-map `index`.
456                else {
457                    res.push(VectorDiff::Insert { index: index - count, value });
458                }
459            }
460        }
461
462        VectorDiff::Set { index, value } => {
463            // The update happens after `count`.
464            if index >= count {
465                res.push(VectorDiff::Set { index: index - count, value });
466            }
467        }
468
469        VectorDiff::Remove { index } => {
470            // The removal happens inside the view.
471            if previous_length > count {
472                // The removal happens before `count`, we need to pop a value at
473                // the front.
474                if index < count {
475                    res.push(VectorDiff::PopFront);
476                }
477                // The removal happens after `count`, we need to re-map `index`.
478                else {
479                    res.push(VectorDiff::Remove { index: index - count });
480                }
481            }
482        }
483
484        VectorDiff::Truncate { length: new_length } => {
485            // The truncation removes some values after `count`.
486            if previous_length > count {
487                // All values removed by the truncation are after `count`.
488                if new_length > count {
489                    res.push(VectorDiff::Truncate { length: new_length - count });
490                }
491                // Some values removed by the truncation are before `count` or exactly at `count`.
492                // It means that all values must be cleared.
493                else {
494                    res.push(VectorDiff::Clear);
495                }
496            }
497        }
498
499        VectorDiff::Reset { values } => {
500            res.push(VectorDiff::Reset { values: values.skip(count) });
501        }
502    }
503
504    res
505}
506
507/// An empty stream with an item type of `usize`.
508#[derive(Debug)]
509#[non_exhaustive]
510pub struct EmptyCountStream;
511
512impl Stream for EmptyCountStream {
513    type Item = usize;
514
515    fn poll_next(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
516        Poll::Ready(None)
517    }
518}