eyeball_im_util/vector/skip.rs
1use smallvec::SmallVec;
2use std::{
3 cmp::{min, Ordering},
4 iter::repeat,
5 pin::Pin,
6 task::{self, ready, Poll},
7};
8
9use super::{
10 VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamElement,
11 VectorDiffContainerStreamSkipBuf, VectorObserver,
12};
13use eyeball_im::VectorDiff;
14use futures_core::Stream;
15use imbl::Vector;
16use pin_project_lite::pin_project;
17
18pin_project! {
19 /// A [`VectorDiff`] stream adapter that presents a limited view of the
20 /// underlying [`ObservableVector`]s items. The view starts after `count`
21 /// number of values are skipped, until the end. It must not be confused
22 /// with [`Tail`](super::Tail) where `Tail` keeps the last values, while
23 /// `Skip` skips the first values.
24 ///
25 /// For example, let `S` be a `Stream<Item = VectorDiff>`. The [`Vector`]
26 /// represented by `S` can have any length, but one may want to virtually
27 /// skip the first `count` values. Then this `Skip` adapter is appropriate.
28 ///
29 /// An internal buffered vector is kept so that the adapter knows which
30 /// values can be added when the index is decreased, or when values are
31 /// removed and new values must be inserted. This fact is important if the
32 /// items of the `Vector` have a non-negligible size.
33 ///
34 /// It's okay to have an index larger than the length of the observed
35 /// `Vector`.
36 ///
37 /// # Examples
38 ///
39 /// ```rust
40 /// use eyeball_im::{ObservableVector, VectorDiff};
41 /// use eyeball_im_util::vector::VectorObserverExt;
42 /// use imbl::vector;
43 /// use stream_assert::{assert_closed, assert_next_eq, assert_pending};
44 ///
45 /// // Our vector.
46 /// let mut ob = ObservableVector::<char>::new();
47 /// let (values, mut sub) = ob.subscribe().skip(3);
48 ///
49 /// assert!(values.is_empty());
50 /// assert_pending!(sub);
51 ///
52 /// // Append multiple values.
53 /// ob.append(vector!['a', 'b', 'c', 'd', 'e']);
54 /// // We get a `VectorDiff::Append` with the latest 2 values because the
55 /// // first 3 values have been skipped!
56 /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['d', 'e'] });
57 ///
58 /// // Let's recap what we have. `ob` is our `ObservableVector`,
59 /// // `sub` is the “limited view” of `ob`:
60 /// // | `ob` | a b c d e |
61 /// // | `sub` | _ _ _ d e |
62 /// // “_” means the item has been skipped.
63 ///
64 /// // Append multiple values.
65 /// ob.append(vector!['f', 'g']);
66 /// // We get a single `VectorDiff`.
67 /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['f', 'g'] });
68 ///
69 /// // Let's recap what we have:
70 /// // | `ob` | a b c d e f g |
71 /// // | `sub` | _ _ _ d e f g |
72 /// // ^^^
73 /// // |
74 /// // `VectorDiff::Append { .. }`
75 ///
76 /// // Insert a single value.
77 /// ob.insert(1, 'h');
78 /// // We get a single `VectorDiff::PushFront`. Indeed, `h` is inserted at
79 /// // index 1, so every value after that is shifted to the right, thus `c`
80 /// // “enters the view” via a `PushFront`.
81 /// assert_next_eq!(sub, VectorDiff::PushFront { value: 'c' });
82 ///
83 /// // Let's recap what we have:
84 /// // | `ob` | a h b c d e f g |
85 /// // | `sub` | _ _ _ c d e f g |
86 /// // ^
87 /// // |
88 /// // `VectorDiff::PushFront { .. }`
89 ///
90 /// assert_pending!(sub);
91 /// drop(ob);
92 /// assert_closed!(sub);
93 /// ```
94 ///
95 /// [`ObservableVector`]: eyeball_im::ObservableVector
96 #[project = SkipProj]
97 pub struct Skip<S, C>
98 where
99 S: Stream,
100 S::Item: VectorDiffContainer,
101 {
102 // The main stream to poll items from.
103 #[pin]
104 inner_stream: S,
105
106 // The count stream to poll new count values from.
107 #[pin]
108 count_stream: C,
109
110 // The buffered vector that is updated with the main stream's items.
111 // It's used to provide missing items, e.g. when the count decreases or
112 // when values must be filled.
113 buffered_vector: Vector<VectorDiffContainerStreamElement<S>>,
114
115 // The current count.
116 //
117 // This is an option because it can be uninitialized. It's incorrect
118 // to use a default value for `count`, contrary to [`Head`] or [`Tail`]
119 // where `limit` can be 0.
120 count: Option<usize>,
121
122 // This adapter is not a basic filter: It can produce many items per
123 // item of the underlying stream.
124 //
125 // Thus, if the item type is just `VectorDiff<_>` (non-bached, can't
126 // just add diffs to a poll_next result), we need a buffer to store the
127 // possible extra item in.
128 ready_values: VectorDiffContainerStreamSkipBuf<S>,
129 }
130}
131
132impl<S> Skip<S, EmptyCountStream>
133where
134 S: Stream,
135 S::Item: VectorDiffContainer,
136{
137 /// Create a new [`Skip`] with the given (unlimited) initial values,
138 /// stream of `VectorDiff` updates for those values, and a fixed count.
139 ///
140 /// Returns the initial values as well as a stream of updates that ensure
141 /// that the resulting vector never includes the first `count` items.
142 pub fn new(
143 initial_values: Vector<VectorDiffContainerStreamElement<S>>,
144 inner_stream: S,
145 count: usize,
146 ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
147 Self::dynamic_with_initial_count(initial_values, inner_stream, count, EmptyCountStream)
148 }
149}
150
151impl<S, C> Skip<S, C>
152where
153 S: Stream,
154 S::Item: VectorDiffContainer,
155 C: Stream<Item = usize>,
156{
157 /// Create a new [`Skip`] with the given (unlimited) initial values,
158 /// stream of `VectorDiff` updates for those values, and a stream of
159 /// indices.
160 ///
161 /// This is equivalent to `dynamic_with_initial_count` where the
162 /// `initial_count` is `usize::MAX`, except that it doesn't return the
163 /// limited vector as it would be empty anyways.
164 ///
165 /// Note that the returned `Skip` won't produce anything until the first
166 /// count is produced by the index stream.
167 pub fn dynamic(
168 initial_values: Vector<VectorDiffContainerStreamElement<S>>,
169 inner_stream: S,
170 count_stream: C,
171 ) -> Self {
172 Self {
173 inner_stream,
174 count_stream,
175 buffered_vector: initial_values,
176 count: None,
177 ready_values: Default::default(),
178 }
179 }
180
181 /// Create a new [`Skip`] with the given (unlimited) initial values,
182 /// stream of `VectorDiff` updates for those values, and an initial
183 /// count as well as a stream of new count values.
184 pub fn dynamic_with_initial_count(
185 initial_values: Vector<VectorDiffContainerStreamElement<S>>,
186 inner_stream: S,
187 initial_count: usize,
188 count_stream: C,
189 ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
190 let buffered_vector = initial_values.clone();
191
192 let initial_values = initial_values.skip(initial_count);
193
194 let stream = Self {
195 inner_stream,
196 count_stream,
197 buffered_vector,
198 count: Some(initial_count),
199 ready_values: Default::default(),
200 };
201
202 (initial_values, stream)
203 }
204}
205
206impl<S, C> Stream for Skip<S, C>
207where
208 S: Stream,
209 S::Item: VectorDiffContainer,
210 C: Stream<Item = usize>,
211{
212 type Item = S::Item;
213
214 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
215 self.project().poll_next(cx)
216 }
217}
218
219impl<S, C> VectorObserver<VectorDiffContainerStreamElement<S>> for Skip<S, C>
220where
221 S: Stream,
222 S::Item: VectorDiffContainer,
223 C: Stream<Item = usize>,
224{
225 type Stream = Self;
226
227 fn into_parts(self) -> (Vector<VectorDiffContainerStreamElement<S>>, Self::Stream) {
228 (self.buffered_vector.clone(), self)
229 }
230}
231
232impl<S, C> SkipProj<'_, S, C>
233where
234 S: Stream,
235 S::Item: VectorDiffContainer,
236 C: Stream<Item = usize>,
237{
238 fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<S::Item>> {
239 loop {
240 // First off, if any values are ready, return them.
241 if let Some(value) = S::Item::pop_from_skip_buf(self.ready_values) {
242 return Poll::Ready(Some(value));
243 }
244
245 // Poll a new count value from `count_stream` before polling `inner_stream`.
246 while let Poll::Ready(Some(next_count)) = self.count_stream.as_mut().poll_next(cx) {
247 // Update the count value and emit `VectorDiff`s accordingly.
248 if let Some(diffs) = self.update_count(next_count) {
249 return Poll::Ready(S::Item::extend_skip_buf(diffs, self.ready_values));
250 }
251
252 // If `update_count` returned `None`, poll the count stream
253 // again.
254 }
255
256 // Poll `VectorDiff`s from the `inner_stream`.
257 let Some(diffs) = ready!(self.inner_stream.as_mut().poll_next(cx)) else {
258 return Poll::Ready(None);
259 };
260
261 // Consume and apply the diffs if possible.
262 let ready = diffs.push_into_skip_buf(self.ready_values, |diff| {
263 let count = *self.count;
264 let previous_length = self.buffered_vector.len();
265
266 // Update the `buffered_vector`. It's a replica of the original observed
267 // `Vector`. We need to maintain it in order to be able to produce valid
268 // `VectorDiff`s when items are missing.
269 diff.clone().apply(self.buffered_vector);
270
271 // Handle the `diff` if and only if there is a count.
272 if let Some(count) = count {
273 handle_diff(diff, count, previous_length, self.buffered_vector)
274 } else {
275 SmallVec::new()
276 }
277 });
278
279 if let Some(diff) = ready {
280 return Poll::Ready(Some(diff));
281 }
282
283 // Else loop and poll the streams again.
284 }
285 }
286
287 /// Update the count value if necessary.
288 ///
289 /// * If the buffered vector is empty, it returns `None`.
290 /// * If the count decreases, `VectorDiff::PushFront`s are produced if any
291 /// items exist.
292 /// * If the count increases, `VectorDiff::PopFront`s are produced.
293 ///
294 /// It's OK to have a `new_count` larger than the length of the `Vector`.
295 /// The `new_count` won't be capped.
296 fn update_count(
297 &mut self,
298 new_count: usize,
299 ) -> Option<Vec<VectorDiff<VectorDiffContainerStreamElement<S>>>> {
300 // Let's update the count.
301 let old_count = self.count.replace(new_count);
302
303 if self.buffered_vector.is_empty() {
304 // If empty, nothing to do.
305 return None;
306 }
307
308 let old_count = match old_count {
309 // First time `count` is initialized.
310 None => {
311 return Some(vec![VectorDiff::Append {
312 values: self.buffered_vector.clone().skip(new_count),
313 }])
314 }
315
316 // Other updates of `count`.
317 Some(old_count) => old_count,
318 };
319
320 // Clamp `old_count` and `new_count` to the length of `buffered_vector` in case
321 // it is larger.
322 let buffered_vector_length = self.buffered_vector.len();
323 let old_count = min(old_count, buffered_vector_length);
324 let new_count = min(new_count, buffered_vector_length);
325
326 match old_count.cmp(&new_count) {
327 // old < new, count is shifting to the right
328 Ordering::Less => {
329 // Skip more items than the buffer contains: we must clear all existing items.
330 if buffered_vector_length <= new_count {
331 Some(vec![VectorDiff::Clear])
332 } else {
333 // Let's remove the extra items.
334 Some(repeat(VectorDiff::PopFront).take(new_count - old_count).collect())
335 }
336 }
337
338 // old > new, count is shifting to the left
339 Ordering::Greater => {
340 // Optimisation: when `old_count` is at the end of `buffered_vector`, and
341 // `new_count` is zero, we can emit a single `VectorDiff::Append` instead of
342 // many `VectorDiff::PushFront`.
343 if old_count == buffered_vector_length && new_count == 0 {
344 Some(vec![VectorDiff::Append { values: self.buffered_vector.clone() }])
345 } else {
346 let mut missing_items = self
347 .buffered_vector
348 .iter()
349 .rev()
350 .skip(buffered_vector_length - old_count)
351 .take(old_count - new_count)
352 .cloned()
353 .peekable();
354
355 if missing_items.peek().is_none() {
356 None
357 } else {
358 // Let's add the missing items.
359 Some(
360 missing_items
361 .map(|missing_item| VectorDiff::PushFront { value: missing_item })
362 .collect(),
363 )
364 }
365 }
366 }
367
368 // old == new
369 Ordering::Equal => {
370 // Nothing to do.
371 None
372 }
373 }
374 }
375}
376
377fn handle_diff<T: Clone>(
378 diff: VectorDiff<T>,
379 count: usize,
380 previous_length: usize,
381 buffered_vector: &Vector<T>,
382) -> SmallVec<[VectorDiff<T>; 2]> {
383 let mut res = SmallVec::new();
384
385 match diff {
386 VectorDiff::Append { values } => {
387 // Some values are appended after `count`.
388 if buffered_vector.len() > count {
389 // Cut `values` if they aren't all appended after `count`.
390 //
391 // Note: Do a `<` instead of `<=` to avoid calling `skip` with 0.
392
393 let values = if previous_length < count {
394 values.skip(count - previous_length)
395 } else {
396 values
397 };
398
399 res.push(VectorDiff::Append { values });
400 }
401 }
402
403 VectorDiff::Clear => {
404 res.push(VectorDiff::Clear);
405 }
406
407 VectorDiff::PushFront { value } => {
408 // The push shifts values after `count`.
409 if previous_length >= count {
410 // The value at `count` is the one that must be pushed front.
411 //
412 // If `count` is zero, let's avoid a look up + clone by forwarding `value`.
413 // Otherwise, let's look up at `count`.
414 if count == 0 {
415 res.push(VectorDiff::PushFront { value });
416 } else if let Some(value) = buffered_vector.get(count) {
417 res.push(VectorDiff::PushFront { value: value.clone() });
418 }
419 }
420 }
421
422 VectorDiff::PushBack { value } => {
423 // The push happens after `count`.
424 if previous_length >= count {
425 res.push(VectorDiff::PushBack { value });
426 }
427 }
428
429 VectorDiff::PopFront => {
430 // The pop shifts values after `count`.
431 if previous_length > count {
432 res.push(VectorDiff::PopFront);
433 }
434 }
435
436 VectorDiff::PopBack => {
437 // The pop happens after `count`.
438 if previous_length > count {
439 res.push(VectorDiff::PopBack);
440 }
441 }
442
443 VectorDiff::Insert { index, value } => {
444 // The insert shifts values after `count`.
445 if previous_length >= count {
446 // The insert happens before `count`, we need to insert a new
447 // value with `PushFront`.
448 if count > 0 && index < count {
449 // The value at `count` is the one that must be
450 // pushed front.
451 if let Some(value) = buffered_vector.get(count) {
452 res.push(VectorDiff::PushFront { value: value.clone() });
453 }
454 }
455 // The insert happens after `count`, we need to re-map `index`.
456 else {
457 res.push(VectorDiff::Insert { index: index - count, value });
458 }
459 }
460 }
461
462 VectorDiff::Set { index, value } => {
463 // The update happens after `count`.
464 if index >= count {
465 res.push(VectorDiff::Set { index: index - count, value });
466 }
467 }
468
469 VectorDiff::Remove { index } => {
470 // The removal happens inside the view.
471 if previous_length > count {
472 // The removal happens before `count`, we need to pop a value at
473 // the front.
474 if index < count {
475 res.push(VectorDiff::PopFront);
476 }
477 // The removal happens after `count`, we need to re-map `index`.
478 else {
479 res.push(VectorDiff::Remove { index: index - count });
480 }
481 }
482 }
483
484 VectorDiff::Truncate { length: new_length } => {
485 // The truncation removes some values after `count`.
486 if previous_length > count {
487 // All values removed by the truncation are after `count`.
488 if new_length > count {
489 res.push(VectorDiff::Truncate { length: new_length - count });
490 }
491 // Some values removed by the truncation are before `count` or exactly at `count`.
492 // It means that all values must be cleared.
493 else {
494 res.push(VectorDiff::Clear);
495 }
496 }
497 }
498
499 VectorDiff::Reset { values } => {
500 res.push(VectorDiff::Reset { values: values.skip(count) });
501 }
502 }
503
504 res
505}
506
507/// An empty stream with an item type of `usize`.
508#[derive(Debug)]
509#[non_exhaustive]
510pub struct EmptyCountStream;
511
512impl Stream for EmptyCountStream {
513 type Item = usize;
514
515 fn poll_next(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
516 Poll::Ready(None)
517 }
518}