effective/impls/
collect.rs1use std::{convert::Infallible, pin::Pin, task::Context};
4
5use futures_util::task::noop_waker_ref;
6
7use crate::{Async, Asynchronous, EffectResult, Effective, Fails, Multiple, Single};
8
9pin_project_lite::pin_project!(
10 pub struct Collect<E, C> {
12 #[pin]
13 pub(super) inner: E,
14 pub(super) into: C,
15 }
16);
17
18impl<E, C> Effective for Collect<E, C>
19where
20 E: Effective<Produces = Multiple>,
21 C: Default + Extend<E::Item>,
22{
23 type Item = C;
24 type Failure = E::Failure;
25 type Produces = Single;
26 type Async = E::Async;
27
28 fn poll_effect(
29 self: Pin<&mut Self>,
30 cx: &mut Context<'_>,
31 ) -> EffectResult<Self::Item, Self::Failure, Self::Produces, Self::Async> {
32 let mut this = self.project();
33
34 if !<Self::Async as Asynchronous>::IS_ASYNC && !<Self::Failure as Fails>::FALLIBLE {
35 this.into.extend(CollectIterator { inner: this.inner });
36 return EffectResult::Item(std::mem::take(this.into));
37 }
38
39 loop {
40 match this.inner.as_mut().poll_effect(cx) {
41 EffectResult::Item(x) => this.into.extend(Some(x)),
42 EffectResult::Failure(x) => return EffectResult::Failure(x),
43 EffectResult::Done(Multiple) => {
44 return EffectResult::Item(std::mem::take(this.into))
45 }
46 EffectResult::Pending(x) => return EffectResult::Pending(x),
47 }
48 }
49 }
50}
51
52struct CollectIterator<'a, E> {
53 inner: Pin<&'a mut E>,
54}
55
56impl<E> Iterator for CollectIterator<'_, E>
57where
58 E: Effective<Produces = Multiple>,
59{
60 type Item = E::Item;
61
62 fn next(&mut self) -> Option<Self::Item> {
63 match self
64 .inner
65 .as_mut()
66 .poll_effect(&mut Context::from_waker(noop_waker_ref()))
67 {
68 EffectResult::Item(x) => Some(x),
69 EffectResult::Failure(_) => unreachable!("FALLIBLE is false"),
70 EffectResult::Done(Multiple) => None,
71 EffectResult::Pending(_) => unreachable!("IS_ASYNC is false"),
72 }
73 }
74
75 fn size_hint(&self) -> (usize, Option<usize>) {
76 self.inner.size_hint()
77 }
78}
79
80impl<E, C> std::future::Future for Collect<E, C>
81where
82 E: Effective<Produces = Multiple, Async = Async, Failure = Infallible>,
83 C: Default + Extend<E::Item>,
84{
85 type Output = C;
86
87 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
88 match self.poll_effect(cx) {
89 EffectResult::Item(value) => std::task::Poll::Ready(value),
90 EffectResult::Failure(_) => unreachable!(),
91 EffectResult::Done(_) => unreachable!(),
92 EffectResult::Pending(_) => std::task::Poll::Pending,
93 }
94 }
95}