eyeball_im_util/vector/head.rs
1use arrayvec::ArrayVec;
2use std::{
3 cmp::{min, Ordering},
4 mem,
5 pin::Pin,
6 task::{self, ready, Poll},
7};
8
9use super::{
10 VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamElement,
11 VectorDiffContainerStreamHeadBuf, VectorObserver,
12};
13use eyeball_im::VectorDiff;
14use futures_core::Stream;
15use imbl::Vector;
16use pin_project_lite::pin_project;
17
18pin_project! {
19 /// A [`VectorDiff`] stream adapter that presents a limited view of the
20 /// underlying [`ObservableVector`]'s items. The view starts from index 0.
21 /// This is the opposite of [`Tail`](super::Tail), which starts from the
22 /// end.
23 ///
24 /// For example, let `S` be a `Stream<Item = VectorDiff>`. The [`Vector`]
25 /// represented by `S` can have any length, but one may want to virtually
26 /// _limit_ this `Vector` to a certain size. Then this `Head` adapter is
27 /// appropriate.
28 ///
29 /// An internal buffered vector is kept so that the adapter knows which
30 /// values can be added when the limit is increased, or when values are
31 /// removed and new values must be inserted. This fact is important if the
32 /// items of the `Vector` have a non-negligible size.
33 ///
34 /// It's okay to have a limit larger than the length of the observed
35 /// `Vector`.
36 ///
37 /// # Examples
38 ///
39 /// ```rust
40 /// use eyeball_im::{ObservableVector, VectorDiff};
41 /// use eyeball_im_util::vector::VectorObserverExt;
42 /// use imbl::vector;
43 /// use stream_assert::{assert_closed, assert_next_eq, assert_pending};
44 ///
45 /// // Our vector.
46 /// let mut ob = ObservableVector::<char>::new();
47 /// let (values, mut sub) = ob.subscribe().head(3);
48 ///
49 /// assert!(values.is_empty());
50 /// assert_pending!(sub);
51 ///
52 /// // Append multiple values.
53 /// ob.append(vector!['a', 'b', 'c', 'd']);
54 /// // We get a `VectorDiff::Append` with the first 3 values!
55 /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c'] });
56 ///
57 /// // Let's recap what we have. `ob` is our `ObservableVector`,
58 /// // `sub` is the “limited view” of `ob`:
59 /// // | `ob` | a b c d |
60 /// // | `sub` | a b c |
61 ///
62 /// // Front push a value.
63 /// ob.push_front('e');
64 /// // We get three `VectorDiff`s!
65 /// assert_next_eq!(sub, VectorDiff::PopBack);
66 /// assert_next_eq!(sub, VectorDiff::PushFront { value: 'e' });
67 ///
68 /// // Let's recap what we have:
69 /// // | `ob` | e a b c d |
70 /// // | `sub` | e a b |
71 /// // ^ ^
72 /// // | |
73 /// // | removed with `VectorDiff::PopBack`
74 /// // added with `VectorDiff::PushFront`
75 ///
76 /// assert_pending!(sub);
77 /// drop(ob);
78 /// assert_closed!(sub);
79 /// ```
80 ///
81 /// [`ObservableVector`]: eyeball_im::ObservableVector
82 #[project = HeadProj]
83 pub struct Head<S, L>
84 where
85 S: Stream,
86 S::Item: VectorDiffContainer,
87 {
88 // The main stream to poll items from.
89 #[pin]
90 inner_stream: S,
91
92 // The limit stream to poll new limits from.
93 #[pin]
94 limit_stream: L,
95
96 // The buffered vector that is updated with the main stream's items.
97 // It's used to provide missing items, e.g. when the limit increases.
98 buffered_vector: Vector<VectorDiffContainerStreamElement<S>>,
99
100 // The current limit.
101 limit: usize,
102
103 // This adapter is not a basic filter: It can produce up to two items
104 // per item of the underlying stream.
105 //
106 // Thus, if the item type is just `VectorDiff<_>` (non-bached, can't
107 // just add diffs to a poll_next result), we need a buffer to store the
108 // possible extra item in. For example if the vector is [10, 11, 12]
109 // with a limit of 2 on top: if an item is popped at the front then 10
110 // is removed, but 12 has to be pushed back as it "enters" the "view".
111 // That second `PushBack` diff is buffered here.
112 ready_values: VectorDiffContainerStreamHeadBuf<S>,
113 }
114}
115
116impl<S> Head<S, EmptyLimitStream>
117where
118 S: Stream,
119 S::Item: VectorDiffContainer,
120{
121 /// Create a new [`Head`] with the given (unlimited) initial values,
122 /// stream of `VectorDiff` updates for those values, and a fixed limit.
123 ///
124 /// Returns the truncated initial values as well as a stream of updates that
125 /// ensure that the resulting vector never exceeds the given limit.
126 pub fn new(
127 initial_values: Vector<VectorDiffContainerStreamElement<S>>,
128 inner_stream: S,
129 limit: usize,
130 ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
131 Self::dynamic_with_initial_limit(initial_values, inner_stream, limit, EmptyLimitStream)
132 }
133}
134
135impl<S, L> Head<S, L>
136where
137 S: Stream,
138 S::Item: VectorDiffContainer,
139 L: Stream<Item = usize>,
140{
141 /// Create a new [`Head`] with the given (unlimited) initial values, stream
142 /// of `VectorDiff` updates for those values, and a stream of limits.
143 ///
144 /// This is equivalent to `dynamic_with_initial_limit` where the
145 /// `initial_limit` is 0, except that it doesn't return the limited
146 /// vector as it would be empty anyways.
147 ///
148 /// Note that the returned `Head` won't produce anything until the first
149 /// limit is produced by the limit stream.
150 pub fn dynamic(
151 initial_values: Vector<VectorDiffContainerStreamElement<S>>,
152 inner_stream: S,
153 limit_stream: L,
154 ) -> Self {
155 Self {
156 inner_stream,
157 limit_stream,
158 buffered_vector: initial_values,
159 limit: 0,
160 ready_values: Default::default(),
161 }
162 }
163
164 /// Create a new [`Head`] with the given (unlimited) initial values, stream
165 /// of `VectorDiff` updates for those values, and an initial limit as well
166 /// as a stream of new limits.
167 pub fn dynamic_with_initial_limit(
168 mut initial_values: Vector<VectorDiffContainerStreamElement<S>>,
169 inner_stream: S,
170 initial_limit: usize,
171 limit_stream: L,
172 ) -> (Vector<VectorDiffContainerStreamElement<S>>, Self) {
173 let buffered_vector = initial_values.clone();
174 if initial_limit < initial_values.len() {
175 initial_values.truncate(initial_limit);
176 }
177
178 let stream = Self {
179 inner_stream,
180 limit_stream,
181 buffered_vector,
182 limit: initial_limit,
183 ready_values: Default::default(),
184 };
185
186 (initial_values, stream)
187 }
188}
189
190impl<S, L> Stream for Head<S, L>
191where
192 S: Stream,
193 S::Item: VectorDiffContainer,
194 L: Stream<Item = usize>,
195{
196 type Item = S::Item;
197
198 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
199 self.project().poll_next(cx)
200 }
201}
202
203impl<S, L> VectorObserver<VectorDiffContainerStreamElement<S>> for Head<S, L>
204where
205 S: Stream,
206 S::Item: VectorDiffContainer,
207 L: Stream<Item = usize>,
208{
209 type Stream = Self;
210
211 fn into_parts(self) -> (Vector<VectorDiffContainerStreamElement<S>>, Self::Stream) {
212 (self.buffered_vector.clone(), self)
213 }
214}
215
216impl<S, L> HeadProj<'_, S, L>
217where
218 S: Stream,
219 S::Item: VectorDiffContainer,
220 L: Stream<Item = usize>,
221{
222 fn poll_next(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<S::Item>> {
223 loop {
224 // First off, if any values are ready, return them.
225 if let Some(value) = S::Item::pop_from_head_buf(self.ready_values) {
226 return Poll::Ready(Some(value));
227 }
228
229 // Poll a new limit from `limit_stream` before polling `inner_stream`.
230 while let Poll::Ready(Some(next_limit)) = self.limit_stream.as_mut().poll_next(cx) {
231 // We have new `VectorDiff`s after the limit has been updated.
232 // Return them.
233 if let Some(diffs) = self.update_limit(next_limit) {
234 return Poll::Ready(Some(diffs));
235 }
236
237 // If `update_limit` returned `None`, poll the limit stream
238 // again.
239 }
240
241 // Poll `VectorDiff`s from the `inner_stream`.
242 let Some(diffs) = ready!(self.inner_stream.as_mut().poll_next(cx)) else {
243 return Poll::Ready(None);
244 };
245
246 // Consume and apply the diffs if possible.
247 let ready = diffs.push_into_head_buf(self.ready_values, |diff| {
248 let limit = *self.limit;
249 let prev_len = self.buffered_vector.len();
250
251 // Update the `buffered_vector`. It's a replica of the original observed
252 // `Vector`. We need to maintain it in order to be able to produce valid
253 // `VectorDiff`s when items are missing.
254 diff.clone().apply(self.buffered_vector);
255
256 // Handle the `diff`.
257 handle_diff(diff, limit, prev_len, self.buffered_vector)
258 });
259
260 if let Some(diff) = ready {
261 return Poll::Ready(Some(diff));
262 }
263
264 // Else loop and poll the streams again.
265 }
266 }
267
268 /// Update the limit if necessary.
269 ///
270 /// * If the buffered vector is empty, it returns `None`.
271 /// * If the limit increases, a `VectorDiff::Append` is produced if any
272 /// items exist.
273 /// * If the limit decreases below the length of the vector, a
274 /// `VectorDiff::Truncate` is produced.
275 ///
276 /// It's OK to have a `new_limit` larger than the length of the `Vector`.
277 /// The `new_limit` won't be capped.
278 fn update_limit(&mut self, new_limit: usize) -> Option<S::Item> {
279 // Let's update the limit.
280 let old_limit = mem::replace(self.limit, new_limit);
281
282 if self.buffered_vector.is_empty() {
283 // If empty, nothing to do.
284 return None;
285 }
286
287 match old_limit.cmp(&new_limit) {
288 // old < new
289 Ordering::Less => {
290 let missing_items = self
291 .buffered_vector
292 .iter()
293 .skip(old_limit)
294 .take(new_limit - old_limit)
295 .cloned()
296 .collect::<Vector<_>>();
297
298 if missing_items.is_empty() {
299 None
300 } else {
301 // Let's add the missing items.
302 Some(S::Item::from_item(VectorDiff::Append { values: missing_items }))
303 }
304 }
305
306 // old > new
307 Ordering::Greater => {
308 if self.buffered_vector.len() <= new_limit {
309 None
310 } else {
311 // Let's remove the extra items.
312 Some(S::Item::from_item(VectorDiff::Truncate { length: new_limit }))
313 }
314 }
315
316 // old == new
317 Ordering::Equal => {
318 // Nothing to do.
319 None
320 }
321 }
322 }
323}
324
325/// An empty stream with an item type of `usize`.
326#[derive(Debug)]
327#[non_exhaustive]
328pub struct EmptyLimitStream;
329
330impl Stream for EmptyLimitStream {
331 type Item = usize;
332
333 fn poll_next(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
334 Poll::Ready(None)
335 }
336}
337
338fn handle_diff<T: Clone>(
339 diff: VectorDiff<T>,
340 limit: usize,
341 prev_len: usize,
342 buffered_vector: &Vector<T>,
343) -> ArrayVec<VectorDiff<T>, 2> {
344 // If the limit is zero, we have nothing to do.
345 if limit == 0 {
346 return ArrayVec::new();
347 }
348
349 let is_full = prev_len >= limit;
350 let mut res = ArrayVec::new();
351
352 match diff {
353 VectorDiff::Append { mut values } => {
354 if is_full {
355 // Ignore the diff.
356 } else {
357 // Truncate the `values` to fit inside the free space.
358 values.truncate(min(limit - prev_len, values.len()));
359 res.push(VectorDiff::Append { values });
360 }
361 }
362 VectorDiff::Clear => {
363 res.push(VectorDiff::Clear);
364 }
365 VectorDiff::PushFront { value } => {
366 if is_full {
367 // Create 1 free space.
368 res.push(VectorDiff::PopBack);
369 }
370
371 // There is space for this new item.
372 res.push(VectorDiff::PushFront { value });
373 }
374 VectorDiff::PushBack { value } => {
375 if is_full {
376 // Ignore the diff.
377 } else {
378 // There is space for this new item.
379 res.push(VectorDiff::PushBack { value });
380 }
381 }
382 VectorDiff::PopFront => {
383 res.push(VectorDiff::PopFront);
384
385 if let Some(diff) = buffered_vector.get(limit - 1) {
386 // There is a previously-truncated item, push back.
387 res.push(VectorDiff::PushBack { value: diff.clone() });
388 }
389 }
390 VectorDiff::PopBack => {
391 if prev_len > limit {
392 // Pop back outside the limit, ignore the diff.
393 } else {
394 res.push(VectorDiff::PopBack);
395 }
396 }
397 VectorDiff::Insert { index, value } => {
398 if index >= limit {
399 // Insert after `limit`, ignore the diff.
400 } else {
401 if is_full {
402 // Create 1 free space.
403 res.push(VectorDiff::PopBack);
404 }
405
406 // There is space for this new item.
407 res.push(VectorDiff::Insert { index, value });
408 }
409 }
410 VectorDiff::Set { index, value } => {
411 if index >= limit {
412 // Update after `limit`, ignore the diff.
413 } else {
414 res.push(VectorDiff::Set { index, value });
415 }
416 }
417 VectorDiff::Remove { index } => {
418 if index >= limit {
419 // Remove after `limit`, ignore the diff.
420 } else {
421 res.push(VectorDiff::Remove { index });
422
423 if let Some(diff) = buffered_vector.get(limit - 1) {
424 // There is a previously-truncated item, push back.
425 res.push(VectorDiff::PushBack { value: diff.clone() });
426 }
427 }
428 }
429 VectorDiff::Truncate { length: new_length } => {
430 if new_length >= limit {
431 // Truncate items after `limit`, ignore the diff.
432 } else {
433 res.push(VectorDiff::Truncate { length: new_length });
434 }
435 }
436 VectorDiff::Reset { values: mut new_values } => {
437 if new_values.len() > limit {
438 // There are too many values, truncate.
439 new_values.truncate(limit);
440 }
441
442 // There is space for these new items.
443 res.push(VectorDiff::Reset { values: new_values });
444 }
445 }
446
447 res
448}