futures_signals_ext/
flatten.rs

1use std::{
2    collections::VecDeque,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures_signals::signal_vec::{SignalVec, SignalVecExt, VecDiff};
8use pin_project_lite::pin_project;
9
10pub(crate) struct FlattenState<A> {
11    signal_vec: Option<Pin<Box<A>>>,
12    len: usize,
13}
14
15impl<A> FlattenState<A>
16where
17    A: SignalVec,
18{
19    fn new(signal_vec: A) -> Self {
20        Self {
21            signal_vec: Some(Box::pin(signal_vec)),
22            len: 0,
23        }
24    }
25
26    fn update_len(&mut self, diff: &VecDiff<A::Item>) {
27        match diff {
28            VecDiff::Replace { values } => {
29                self.len = values.len();
30            }
31            VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
32                self.len += 1;
33            }
34            VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
35                self.len -= 1;
36            }
37            VecDiff::Clear {} => {
38                self.len = 0;
39            }
40            VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {}
41        }
42    }
43
44    fn poll(&mut self, cx: &mut Context) -> Option<Poll<Option<VecDiff<A::Item>>>> {
45        self.signal_vec
46            .as_mut()
47            .map(|s| s.poll_vec_change_unpin(cx))
48    }
49
50    fn poll_values(&mut self, cx: &mut Context) -> Vec<A::Item> {
51        let mut output = vec![];
52
53        loop {
54            match self.poll(cx) {
55                Some(Poll::Ready(Some(diff))) => {
56                    self.update_len(&diff);
57                    diff.apply_to_vec(&mut output);
58                }
59                Some(Poll::Ready(None)) => {
60                    self.signal_vec = None;
61                    break;
62                }
63                Some(Poll::Pending) | None => {
64                    break;
65                }
66            }
67        }
68
69        output
70    }
71
72    fn poll_pending(
73        &mut self,
74        cx: &mut Context,
75        prev_len: usize,
76        pending: &mut PendingBuilder<VecDiff<A::Item>>,
77    ) -> bool {
78        loop {
79            return match self.poll(cx) {
80                Some(Poll::Ready(Some(diff))) => {
81                    let old_len = self.len;
82
83                    self.update_len(&diff);
84
85                    match diff {
86                        VecDiff::Replace { values } => {
87                            for index in (0..old_len).rev() {
88                                pending.push(VecDiff::RemoveAt {
89                                    index: prev_len + index,
90                                });
91                            }
92
93                            for (index, value) in values.into_iter().enumerate() {
94                                pending.push(VecDiff::InsertAt {
95                                    index: prev_len + index,
96                                    value,
97                                });
98                            }
99                        }
100                        VecDiff::InsertAt { index, value } => {
101                            pending.push(VecDiff::InsertAt {
102                                index: prev_len + index,
103                                value,
104                            });
105                        }
106                        VecDiff::UpdateAt { index, value } => {
107                            pending.push(VecDiff::UpdateAt {
108                                index: prev_len + index,
109                                value,
110                            });
111                        }
112                        VecDiff::RemoveAt { index } => {
113                            pending.push(VecDiff::RemoveAt {
114                                index: prev_len + index,
115                            });
116                        }
117                        VecDiff::Move {
118                            old_index,
119                            new_index,
120                        } => {
121                            pending.push(VecDiff::Move {
122                                old_index: prev_len + old_index,
123                                new_index: prev_len + new_index,
124                            });
125                        }
126                        VecDiff::Push { value } => {
127                            pending.push(VecDiff::InsertAt {
128                                index: prev_len + old_len,
129                                value,
130                            });
131                        }
132                        VecDiff::Pop {} => {
133                            pending.push(VecDiff::RemoveAt {
134                                index: prev_len + (old_len - 1),
135                            });
136                        }
137                        VecDiff::Clear {} => {
138                            for index in (0..old_len).rev() {
139                                pending.push(VecDiff::RemoveAt {
140                                    index: prev_len + index,
141                                });
142                            }
143                        }
144                    }
145
146                    continue;
147                }
148                Some(Poll::Ready(None)) => {
149                    self.signal_vec = None;
150                    true
151                }
152                Some(Poll::Pending) => false,
153                None => true,
154            };
155        }
156    }
157}
158
159pin_project! {
160    #[must_use = "SignalVecs do nothing unless polled"]
161    pub struct Flatten<A>
162    where
163        A: SignalVec,
164        A::Item: SignalVec,
165    {
166        #[pin]
167        pub(crate) signal: Option<A>,
168        pub(crate) inner: Vec<FlattenState<A::Item>>,
169        pub(crate) pending: VecDeque<VecDiff<<A::Item as SignalVec>::Item>>,
170    }
171}
172
173fn fill_removals<A>(
174    inner: &[FlattenState<A>],
175    index: usize,
176    pending: &mut PendingBuilder<VecDiff<A::Item>>,
177) where
178    A: SignalVec,
179{
180    let removed_len = inner[index].len;
181    let prev_len: usize = inner[..index].iter().map(|state| state.len).sum();
182    for index in (0..removed_len).rev() {
183        pending.push(VecDiff::RemoveAt {
184            index: prev_len + index,
185        });
186    }
187}
188fn fill_moves<A>(
189    inner: &[FlattenState<A>],
190    old_index: usize,
191    new_index: usize,
192    pending: &mut PendingBuilder<VecDiff<A::Item>>,
193) where
194    A: SignalVec,
195{
196    let moved_len = inner[old_index].len;
197    let old_prev_len: usize = inner[..old_index].iter().map(|state| state.len).sum();
198    let new_prev_len: usize = inner[..new_index].iter().map(|state| state.len).sum();
199    if new_index < old_index {
200        (0..moved_len).for_each(|_| {
201            pending.push(VecDiff::Move {
202                old_index: old_prev_len + moved_len - 1,
203                new_index: new_prev_len,
204            })
205        });
206    } else {
207        (0..moved_len).for_each(|_| {
208            pending.push(VecDiff::Move {
209                old_index: old_prev_len,
210                new_index: new_prev_len + moved_len - 1,
211            })
212        });
213    }
214}
215
216impl<A> SignalVec for Flatten<A>
217where
218    A: SignalVec,
219    A::Item: SignalVec,
220{
221    type Item = <A::Item as SignalVec>::Item;
222
223    fn poll_vec_change(
224        self: Pin<&mut Self>,
225        cx: &mut Context,
226    ) -> Poll<Option<VecDiff<Self::Item>>> {
227        let mut this = self.project();
228
229        if let Some(diff) = this.pending.pop_front() {
230            return Poll::Ready(Some(diff));
231        }
232
233        let mut pending: PendingBuilder<VecDiff<Self::Item>> = PendingBuilder::new();
234
235        let top_done = loop {
236            break match this
237                .signal
238                .as_mut()
239                .as_pin_mut()
240                .map(|signal| signal.poll_vec_change(cx))
241            {
242                Some(Poll::Ready(Some(diff))) => {
243                    match diff {
244                        VecDiff::Replace { values } => {
245                            *this.inner = values.into_iter().map(FlattenState::new).collect();
246
247                            let values = this
248                                .inner
249                                .iter_mut()
250                                .flat_map(|state| state.poll_values(cx))
251                                .collect();
252
253                            return Poll::Ready(Some(VecDiff::Replace { values }));
254                        }
255                        VecDiff::InsertAt { index, value } => {
256                            this.inner.insert(index, FlattenState::new(value));
257                        }
258                        VecDiff::UpdateAt { index, value } => {
259                            fill_removals(&this.inner, index, &mut pending);
260                            this.inner[index] = FlattenState::new(value);
261                        }
262                        VecDiff::RemoveAt { index } => {
263                            fill_removals(&this.inner, index, &mut pending);
264                            this.inner.remove(index);
265                        }
266                        VecDiff::Move {
267                            old_index,
268                            new_index,
269                        } => {
270                            if old_index != new_index {
271                                fill_moves(&this.inner, old_index, new_index, &mut pending);
272                                let value = this.inner.remove(old_index);
273                                this.inner.insert(new_index, value);
274                            }
275                        }
276                        VecDiff::Push { value } => {
277                            this.inner.push(FlattenState::new(value));
278                        }
279                        VecDiff::Pop {} => {
280                            let len = this.inner.pop().unwrap().len;
281                            (0..len).for_each(|_| pending.push(VecDiff::Pop {}));
282                        }
283                        VecDiff::Clear {} => {
284                            this.inner.clear();
285                            return Poll::Ready(Some(VecDiff::Clear {}));
286                        }
287                    }
288
289                    continue;
290                }
291                Some(Poll::Ready(None)) => {
292                    this.signal.set(None);
293                    true
294                }
295                Some(Poll::Pending) => false,
296                None => true,
297            };
298        };
299
300        let mut inner_done = true;
301        let mut prev_len = 0;
302        for state in this.inner.iter_mut() {
303            inner_done &= state.poll_pending(cx, prev_len, &mut pending);
304            prev_len += state.len;
305        }
306
307        if let Some(first) = pending.first {
308            *this.pending = pending.rest;
309            Poll::Ready(Some(first))
310        } else if inner_done && top_done {
311            Poll::Ready(None)
312        } else {
313            Poll::Pending
314        }
315    }
316}
317
318struct PendingBuilder<A> {
319    first: Option<A>,
320    rest: VecDeque<A>,
321}
322
323impl<A> PendingBuilder<A> {
324    fn new() -> Self {
325        Self {
326            first: None,
327            rest: VecDeque::new(),
328        }
329    }
330
331    fn push(&mut self, value: A) {
332        if let None = self.first {
333            self.first = Some(value);
334        } else {
335            self.rest.push_back(value);
336        }
337    }
338}