futures_signals_ext/
flatten.rs1use std::{
2 collections::VecDeque,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures_signals::signal_vec::{SignalVec, SignalVecExt, VecDiff};
8use pin_project_lite::pin_project;
9
10pub(crate) struct FlattenState<A> {
11 signal_vec: Option<Pin<Box<A>>>,
12 len: usize,
13}
14
15impl<A> FlattenState<A>
16where
17 A: SignalVec,
18{
19 fn new(signal_vec: A) -> Self {
20 Self {
21 signal_vec: Some(Box::pin(signal_vec)),
22 len: 0,
23 }
24 }
25
26 fn update_len(&mut self, diff: &VecDiff<A::Item>) {
27 match diff {
28 VecDiff::Replace { values } => {
29 self.len = values.len();
30 }
31 VecDiff::InsertAt { .. } | VecDiff::Push { .. } => {
32 self.len += 1;
33 }
34 VecDiff::RemoveAt { .. } | VecDiff::Pop {} => {
35 self.len -= 1;
36 }
37 VecDiff::Clear {} => {
38 self.len = 0;
39 }
40 VecDiff::UpdateAt { .. } | VecDiff::Move { .. } => {}
41 }
42 }
43
44 fn poll(&mut self, cx: &mut Context) -> Option<Poll<Option<VecDiff<A::Item>>>> {
45 self.signal_vec
46 .as_mut()
47 .map(|s| s.poll_vec_change_unpin(cx))
48 }
49
50 fn poll_values(&mut self, cx: &mut Context) -> Vec<A::Item> {
51 let mut output = vec![];
52
53 loop {
54 match self.poll(cx) {
55 Some(Poll::Ready(Some(diff))) => {
56 self.update_len(&diff);
57 diff.apply_to_vec(&mut output);
58 }
59 Some(Poll::Ready(None)) => {
60 self.signal_vec = None;
61 break;
62 }
63 Some(Poll::Pending) | None => {
64 break;
65 }
66 }
67 }
68
69 output
70 }
71
72 fn poll_pending(
73 &mut self,
74 cx: &mut Context,
75 prev_len: usize,
76 pending: &mut PendingBuilder<VecDiff<A::Item>>,
77 ) -> bool {
78 loop {
79 return match self.poll(cx) {
80 Some(Poll::Ready(Some(diff))) => {
81 let old_len = self.len;
82
83 self.update_len(&diff);
84
85 match diff {
86 VecDiff::Replace { values } => {
87 for index in (0..old_len).rev() {
88 pending.push(VecDiff::RemoveAt {
89 index: prev_len + index,
90 });
91 }
92
93 for (index, value) in values.into_iter().enumerate() {
94 pending.push(VecDiff::InsertAt {
95 index: prev_len + index,
96 value,
97 });
98 }
99 }
100 VecDiff::InsertAt { index, value } => {
101 pending.push(VecDiff::InsertAt {
102 index: prev_len + index,
103 value,
104 });
105 }
106 VecDiff::UpdateAt { index, value } => {
107 pending.push(VecDiff::UpdateAt {
108 index: prev_len + index,
109 value,
110 });
111 }
112 VecDiff::RemoveAt { index } => {
113 pending.push(VecDiff::RemoveAt {
114 index: prev_len + index,
115 });
116 }
117 VecDiff::Move {
118 old_index,
119 new_index,
120 } => {
121 pending.push(VecDiff::Move {
122 old_index: prev_len + old_index,
123 new_index: prev_len + new_index,
124 });
125 }
126 VecDiff::Push { value } => {
127 pending.push(VecDiff::InsertAt {
128 index: prev_len + old_len,
129 value,
130 });
131 }
132 VecDiff::Pop {} => {
133 pending.push(VecDiff::RemoveAt {
134 index: prev_len + (old_len - 1),
135 });
136 }
137 VecDiff::Clear {} => {
138 for index in (0..old_len).rev() {
139 pending.push(VecDiff::RemoveAt {
140 index: prev_len + index,
141 });
142 }
143 }
144 }
145
146 continue;
147 }
148 Some(Poll::Ready(None)) => {
149 self.signal_vec = None;
150 true
151 }
152 Some(Poll::Pending) => false,
153 None => true,
154 };
155 }
156 }
157}
158
159pin_project! {
160 #[must_use = "SignalVecs do nothing unless polled"]
161 pub struct Flatten<A>
162 where
163 A: SignalVec,
164 A::Item: SignalVec,
165 {
166 #[pin]
167 pub(crate) signal: Option<A>,
168 pub(crate) inner: Vec<FlattenState<A::Item>>,
169 pub(crate) pending: VecDeque<VecDiff<<A::Item as SignalVec>::Item>>,
170 }
171}
172
173fn fill_removals<A>(
174 inner: &[FlattenState<A>],
175 index: usize,
176 pending: &mut PendingBuilder<VecDiff<A::Item>>,
177) where
178 A: SignalVec,
179{
180 let removed_len = inner[index].len;
181 let prev_len: usize = inner[..index].iter().map(|state| state.len).sum();
182 for index in (0..removed_len).rev() {
183 pending.push(VecDiff::RemoveAt {
184 index: prev_len + index,
185 });
186 }
187}
188fn fill_moves<A>(
189 inner: &[FlattenState<A>],
190 old_index: usize,
191 new_index: usize,
192 pending: &mut PendingBuilder<VecDiff<A::Item>>,
193) where
194 A: SignalVec,
195{
196 let moved_len = inner[old_index].len;
197 let old_prev_len: usize = inner[..old_index].iter().map(|state| state.len).sum();
198 let new_prev_len: usize = inner[..new_index].iter().map(|state| state.len).sum();
199 if new_index < old_index {
200 (0..moved_len).for_each(|_| {
201 pending.push(VecDiff::Move {
202 old_index: old_prev_len + moved_len - 1,
203 new_index: new_prev_len,
204 })
205 });
206 } else {
207 (0..moved_len).for_each(|_| {
208 pending.push(VecDiff::Move {
209 old_index: old_prev_len,
210 new_index: new_prev_len + moved_len - 1,
211 })
212 });
213 }
214}
215
216impl<A> SignalVec for Flatten<A>
217where
218 A: SignalVec,
219 A::Item: SignalVec,
220{
221 type Item = <A::Item as SignalVec>::Item;
222
223 fn poll_vec_change(
224 self: Pin<&mut Self>,
225 cx: &mut Context,
226 ) -> Poll<Option<VecDiff<Self::Item>>> {
227 let mut this = self.project();
228
229 if let Some(diff) = this.pending.pop_front() {
230 return Poll::Ready(Some(diff));
231 }
232
233 let mut pending: PendingBuilder<VecDiff<Self::Item>> = PendingBuilder::new();
234
235 let top_done = loop {
236 break match this
237 .signal
238 .as_mut()
239 .as_pin_mut()
240 .map(|signal| signal.poll_vec_change(cx))
241 {
242 Some(Poll::Ready(Some(diff))) => {
243 match diff {
244 VecDiff::Replace { values } => {
245 *this.inner = values.into_iter().map(FlattenState::new).collect();
246
247 let values = this
248 .inner
249 .iter_mut()
250 .flat_map(|state| state.poll_values(cx))
251 .collect();
252
253 return Poll::Ready(Some(VecDiff::Replace { values }));
254 }
255 VecDiff::InsertAt { index, value } => {
256 this.inner.insert(index, FlattenState::new(value));
257 }
258 VecDiff::UpdateAt { index, value } => {
259 fill_removals(&this.inner, index, &mut pending);
260 this.inner[index] = FlattenState::new(value);
261 }
262 VecDiff::RemoveAt { index } => {
263 fill_removals(&this.inner, index, &mut pending);
264 this.inner.remove(index);
265 }
266 VecDiff::Move {
267 old_index,
268 new_index,
269 } => {
270 if old_index != new_index {
271 fill_moves(&this.inner, old_index, new_index, &mut pending);
272 let value = this.inner.remove(old_index);
273 this.inner.insert(new_index, value);
274 }
275 }
276 VecDiff::Push { value } => {
277 this.inner.push(FlattenState::new(value));
278 }
279 VecDiff::Pop {} => {
280 let len = this.inner.pop().unwrap().len;
281 (0..len).for_each(|_| pending.push(VecDiff::Pop {}));
282 }
283 VecDiff::Clear {} => {
284 this.inner.clear();
285 return Poll::Ready(Some(VecDiff::Clear {}));
286 }
287 }
288
289 continue;
290 }
291 Some(Poll::Ready(None)) => {
292 this.signal.set(None);
293 true
294 }
295 Some(Poll::Pending) => false,
296 None => true,
297 };
298 };
299
300 let mut inner_done = true;
301 let mut prev_len = 0;
302 for state in this.inner.iter_mut() {
303 inner_done &= state.poll_pending(cx, prev_len, &mut pending);
304 prev_len += state.len;
305 }
306
307 if let Some(first) = pending.first {
308 *this.pending = pending.rest;
309 Poll::Ready(Some(first))
310 } else if inner_done && top_done {
311 Poll::Ready(None)
312 } else {
313 Poll::Pending
314 }
315 }
316}
317
318struct PendingBuilder<A> {
319 first: Option<A>,
320 rest: VecDeque<A>,
321}
322
323impl<A> PendingBuilder<A> {
324 fn new() -> Self {
325 Self {
326 first: None,
327 rest: VecDeque::new(),
328 }
329 }
330
331 fn push(&mut self, value: A) {
332 if let None = self.first {
333 self.first = Some(value);
334 } else {
335 self.rest.push_back(value);
336 }
337 }
338}