eyeball_im_util/vector/
head.rs

1use arrayvec::ArrayVec;
2use std::{
3    cmp::{min, Ordering},
4    mem,
5    pin::Pin,
6    task::{self, ready, Poll},
7};
8
9use super::{
10    VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamElement,
11    VectorDiffContainerStreamHeadBuf, VectorObserver,
12};
13use eyeball_im::VectorDiff;
14use futures_core::Stream;
15use imbl::Vector;
16use pin_project_lite::pin_project;
17
18pin_project! {
19    /// A [`VectorDiff`] stream adapter that presents a limited view of the
20    /// underlying [`ObservableVector`]'s items. The view starts from index 0.
21    /// This is the opposite of [`Tail`](super::Tail), which starts from the
22    /// end.
23    ///
24    /// For example, let `S` be a `Stream<Item = VectorDiff>`. The [`Vector`]
25    /// represented by `S` can have any length, but one may want to virtually
26    /// _limit_ this `Vector` to a certain size. Then this `Head` adapter is
27    /// appropriate.
28    ///
29    /// An internal buffered vector is kept so that the adapter knows which
30    /// values can be added when the limit is increased, or when values are
31    /// removed and new values must be inserted. This fact is important if the
32    /// items of the `Vector` have a non-negligible size.
33    ///
34    /// It's okay to have a limit larger than the length of the observed
35    /// `Vector`.
36    ///
37    /// # Examples
38    ///
39    /// ```rust
40    /// use eyeball_im::{ObservableVector, VectorDiff};
41    /// use eyeball_im_util::vector::VectorObserverExt;
42    /// use imbl::vector;
43    /// use stream_assert::{assert_closed, assert_next_eq, assert_pending};
44    ///
45    /// // Our vector.
46    /// let mut ob = ObservableVector::<char>::new();
47    /// let (values, mut sub) = ob.subscribe().head(3);
48    ///
49    /// assert!(values.is_empty());
50    /// assert_pending!(sub);
51    ///
52    /// // Append multiple values.
53    /// ob.append(vector!['a', 'b', 'c', 'd']);
54    /// // We get a `VectorDiff::Append` with the first 3 values!
55    /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c'] });
56    ///
57    /// // Let's recap what we have. `ob` is our `ObservableVector`,
58    /// // `sub` is the “limited view” of `ob`:
59    /// // | `ob`  | a b c d |
60    /// // | `sub` | a b c   |
61    ///
62    /// // Front push a value.
63    /// ob.push_front('e');
64    /// // We get three `VectorDiff`s!
65    /// assert_next_eq!(sub, VectorDiff::PopBack);
66    /// assert_next_eq!(sub, VectorDiff::PushFront { value: 'e' });
67    ///
68    /// // Let's recap what we have:
69    /// // | `ob`  | e a b c d |
70    /// // | `sub` | e a b     |
71    /// //           ^     ^
72    /// //           |     |
73    /// //           |     removed with `VectorDiff::PopBack`
74    /// //           added with `VectorDiff::PushFront`
75    ///
76    /// assert_pending!(sub);
77    /// drop(ob);
78    /// assert_closed!(sub);
79    /// ```
80    ///
81    /// [`ObservableVector`]: eyeball_im::ObservableVector
82    #[project = HeadProj]
83    pub struct Head<S, L>
84    where
85        S: Stream,
86        S::Item: VectorDiffContainer,
87    {
88        // The main stream to poll items from.
89        #[pin]
90        inner_stream: S,
91
92        // The limit stream to poll new limits from.
93        #[pin]
94        limit_stream: L,
95
96        // The buffered vector that is updated with the main stream's items.
97        // It's used to provide missing items, e.g. when the limit increases.
98        buffered_vector: Vector<VectorDiffContainerStreamElement<S>>,
99
100        // The current limit.
101        limit: usize,
102
103        // This adapter is not a basic filter: It can produce up to two items
104        // per item of the underlying stream.
105        //
106        // Thus, if the item type is just `VectorDiff<_>` (non-bached, can't
107        // just add diffs to a poll_next result), we need a buffer to store the
108        // possible extra item in. For example if the vector is [10, 11, 12]
109        // with a limit of 2 on top: if an item is popped at the front then 10
110        // is removed, but 12 has to be pushed back as it "enters" the "view".
111        // That second `PushBack` diff is buffered here.
112        ready_values: VectorDiffContainerStreamHeadBuf<S>,
113    }
114}
115
116impl<S> Head<S, EmptyLimitStream>
117where
118    S: Stream,
119    S::Item: VectorDiffContainer,
120{
121    /// Create a new [`Head`] with the given (unlimited) initial values,
122    /// stream of `VectorDiff` updates for those values, and a fixed limit.
123    ///
124    /// Returns the truncated initial values as well as a stream of updates that
125    /// ensure that the resulting vector never exceeds the given limit.
126    pub fn new(
127        initial_values: Vector<VectorDiffContainerStreamElement<S>>,
128        inner_stream: S,
129        limit: usize,
130    ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
131        Self::dynamic_with_initial_limit(initial_values, inner_stream, limit, EmptyLimitStream)
132    }
133}
134
135impl<S, L> Head<S, L>
136where
137    S: Stream,
138    S::Item: VectorDiffContainer,
139    L: Stream<Item = usize>,
140{
141    /// Create a new [`Head`] with the given (unlimited) initial values, stream
142    /// of `VectorDiff` updates for those values, and a stream of limits.
143    ///
144    /// This is equivalent to `dynamic_with_initial_limit` where the
145    /// `initial_limit` is 0, except that it doesn't return the limited
146    /// vector as it would be empty anyways.
147    ///
148    /// Note that the returned `Head` won't produce anything until the first
149    /// limit is produced by the limit stream.
150    pub fn dynamic(
151        initial_values: Vector<VectorDiffContainerStreamElement<S>>,
152        inner_stream: S,
153        limit_stream: L,
154    ) -> Self {
155        Self {
156            inner_stream,
157            limit_stream,
158            buffered_vector: initial_values,
159            limit: 0,
160            ready_values: Default::default(),
161        }
162    }
163
164    /// Create a new [`Head`] with the given (unlimited) initial values, stream
165    /// of `VectorDiff` updates for those values, and an initial limit as well
166    /// as a stream of new limits.
167    pub fn dynamic_with_initial_limit(
168        mut initial_values: Vector<VectorDiffContainerStreamElement<S>>,
169        inner_stream: S,
170        initial_limit: usize,
171        limit_stream: L,
172    ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
173        let buffered_vector = initial_values.clone();
174        if initial_limit < initial_values.len() {
175            initial_values.truncate(initial_limit);
176        }
177
178        let stream = Self {
179            inner_stream,
180            limit_stream,
181            buffered_vector,
182            limit: initial_limit,
183            ready_values: Default::default(),
184        };
185
186        (initial_values, stream)
187    }
188}
189
190impl<S, L> Stream for Head<S, L>
191where
192    S: Stream,
193    S::Item: VectorDiffContainer,
194    L: Stream<Item = usize>,
195{
196    type Item = S::Item;
197
198    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
199        self.project().poll_next(cx)
200    }
201}
202
203impl<S, L> VectorObserver<VectorDiffContainerStreamElement<S>> for Head<S, L>
204where
205    S: Stream,
206    S::Item: VectorDiffContainer,
207    L: Stream<Item = usize>,
208{
209    type Stream = Self;
210
211    fn into_parts(self) -> (Vector<VectorDiffContainerStreamElement<S>>, Self::Stream) {
212        (self.buffered_vector.clone(), self)
213    }
214}
215
216impl<S, L> HeadProj<'_, S, L>
217where
218    S: Stream,
219    S::Item: VectorDiffContainer,
220    L: Stream<Item = usize>,
221{
222    fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<S::Item>> {
223        loop {
224            // First off, if any values are ready, return them.
225            if let Some(value) = S::Item::pop_from_head_buf(self.ready_values) {
226                return Poll::Ready(Some(value));
227            }
228
229            // Poll a new limit from `limit_stream` before polling `inner_stream`.
230            while let Poll::Ready(Some(next_limit)) = self.limit_stream.as_mut().poll_next(cx) {
231                // We have new `VectorDiff`s after the limit has been updated.
232                // Return them.
233                if let Some(diffs) = self.update_limit(next_limit) {
234                    return Poll::Ready(Some(diffs));
235                }
236
237                // If `update_limit` returned `None`, poll the limit stream
238                // again.
239            }
240
241            // Poll `VectorDiff`s from the `inner_stream`.
242            let Some(diffs) = ready!(self.inner_stream.as_mut().poll_next(cx)) else {
243                return Poll::Ready(None);
244            };
245
246            // Consume and apply the diffs if possible.
247            let ready = diffs.push_into_head_buf(self.ready_values, |diff| {
248                let limit = *self.limit;
249                let prev_len = self.buffered_vector.len();
250
251                // Update the `buffered_vector`. It's a replica of the original observed
252                // `Vector`. We need to maintain it in order to be able to produce valid
253                // `VectorDiff`s when items are missing.
254                diff.clone().apply(self.buffered_vector);
255
256                // Handle the `diff`.
257                handle_diff(diff, limit, prev_len, self.buffered_vector)
258            });
259
260            if let Some(diff) = ready {
261                return Poll::Ready(Some(diff));
262            }
263
264            // Else loop and poll the streams again.
265        }
266    }
267
268    /// Update the limit if necessary.
269    ///
270    /// * If the buffered vector is empty, it returns `None`.
271    /// * If the limit increases, a `VectorDiff::Append` is produced if any
272    ///   items exist.
273    /// * If the limit decreases below the length of the vector, a
274    ///   `VectorDiff::Truncate` is produced.
275    ///
276    /// It's OK to have a `new_limit` larger than the length of the `Vector`.
277    /// The `new_limit` won't be capped.
278    fn update_limit(&mut self, new_limit: usize) -> Option<S::Item> {
279        // Let's update the limit.
280        let old_limit = mem::replace(self.limit, new_limit);
281
282        if self.buffered_vector.is_empty() {
283            // If empty, nothing to do.
284            return None;
285        }
286
287        match old_limit.cmp(&new_limit) {
288            // old < new
289            Ordering::Less => {
290                let missing_items = self
291                    .buffered_vector
292                    .iter()
293                    .skip(old_limit)
294                    .take(new_limit - old_limit)
295                    .cloned()
296                    .collect::<Vector<_>>();
297
298                if missing_items.is_empty() {
299                    None
300                } else {
301                    // Let's add the missing items.
302                    Some(S::Item::from_item(VectorDiff::Append { values: missing_items }))
303                }
304            }
305
306            // old > new
307            Ordering::Greater => {
308                if self.buffered_vector.len() <= new_limit {
309                    None
310                } else {
311                    // Let's remove the extra items.
312                    Some(S::Item::from_item(VectorDiff::Truncate { length: new_limit }))
313                }
314            }
315
316            // old == new
317            Ordering::Equal => {
318                // Nothing to do.
319                None
320            }
321        }
322    }
323}
324
325/// An empty stream with an item type of `usize`.
326#[derive(Debug)]
327#[non_exhaustive]
328pub struct EmptyLimitStream;
329
330impl Stream for EmptyLimitStream {
331    type Item = usize;
332
333    fn poll_next(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
334        Poll::Ready(None)
335    }
336}
337
338fn handle_diff<T: Clone>(
339    diff: VectorDiff<T>,
340    limit: usize,
341    prev_len: usize,
342    buffered_vector: &Vector<T>,
343) -> ArrayVec<VectorDiff<T>, 2> {
344    // If the limit is zero, we have nothing to do.
345    if limit == 0 {
346        return ArrayVec::new();
347    }
348
349    let is_full = prev_len >= limit;
350    let mut res = ArrayVec::new();
351
352    match diff {
353        VectorDiff::Append { mut values } => {
354            if is_full {
355                // Ignore the diff.
356            } else {
357                // Truncate the `values` to fit inside the free space.
358                values.truncate(min(limit - prev_len, values.len()));
359                res.push(VectorDiff::Append { values });
360            }
361        }
362        VectorDiff::Clear => {
363            res.push(VectorDiff::Clear);
364        }
365        VectorDiff::PushFront { value } => {
366            if is_full {
367                // Create 1 free space.
368                res.push(VectorDiff::PopBack);
369            }
370
371            // There is space for this new item.
372            res.push(VectorDiff::PushFront { value });
373        }
374        VectorDiff::PushBack { value } => {
375            if is_full {
376                // Ignore the diff.
377            } else {
378                // There is space for this new item.
379                res.push(VectorDiff::PushBack { value });
380            }
381        }
382        VectorDiff::PopFront => {
383            res.push(VectorDiff::PopFront);
384
385            if let Some(diff) = buffered_vector.get(limit - 1) {
386                // There is a previously-truncated item, push back.
387                res.push(VectorDiff::PushBack { value: diff.clone() });
388            }
389        }
390        VectorDiff::PopBack => {
391            if prev_len > limit {
392                // Pop back outside the limit, ignore the diff.
393            } else {
394                res.push(VectorDiff::PopBack);
395            }
396        }
397        VectorDiff::Insert { index, value } => {
398            if index >= limit {
399                // Insert after `limit`, ignore the diff.
400            } else {
401                if is_full {
402                    // Create 1 free space.
403                    res.push(VectorDiff::PopBack);
404                }
405
406                // There is space for this new item.
407                res.push(VectorDiff::Insert { index, value });
408            }
409        }
410        VectorDiff::Set { index, value } => {
411            if index >= limit {
412                // Update after `limit`, ignore the diff.
413            } else {
414                res.push(VectorDiff::Set { index, value });
415            }
416        }
417        VectorDiff::Remove { index } => {
418            if index >= limit {
419                // Remove after `limit`, ignore the diff.
420            } else {
421                res.push(VectorDiff::Remove { index });
422
423                if let Some(diff) = buffered_vector.get(limit - 1) {
424                    // There is a previously-truncated item, push back.
425                    res.push(VectorDiff::PushBack { value: diff.clone() });
426                }
427            }
428        }
429        VectorDiff::Truncate { length: new_length } => {
430            if new_length >= limit {
431                // Truncate items after `limit`, ignore the diff.
432            } else {
433                res.push(VectorDiff::Truncate { length: new_length });
434            }
435        }
436        VectorDiff::Reset { values: mut new_values } => {
437            if new_values.len() > limit {
438                // There are too many values, truncate.
439                new_values.truncate(limit);
440            }
441
442            // There is space for these new items.
443            res.push(VectorDiff::Reset { values: new_values });
444        }
445    }
446
447    res
448}