effective/impls/
collect.rs

1//! Effect adaptors to subtract the 'iterable' effect
2
3use std::{convert::Infallible, pin::Pin, task::Context};
4
5use futures_util::task::noop_waker_ref;
6
7use crate::{Async, Asynchronous, EffectResult, Effective, Fails, Multiple, Single};
8
9pin_project_lite::pin_project!(
10    /// Produced by the [`collect()`](super::EffectiveExt::collect) method
11    pub struct Collect<E, C> {
12        #[pin]
13        pub(super) inner: E,
14        pub(super) into: C,
15    }
16);
17
18impl<E, C> Effective for Collect<E, C>
19where
20    E: Effective<Produces = Multiple>,
21    C: Default + Extend<E::Item>,
22{
23    type Item = C;
24    type Failure = E::Failure;
25    type Produces = Single;
26    type Async = E::Async;
27
28    fn poll_effect(
29        self: Pin<&mut Self>,
30        cx: &mut Context<'_>,
31    ) -> EffectResult<Self::Item, Self::Failure, Self::Produces, Self::Async> {
32        let mut this = self.project();
33
34        if !<Self::Async as Asynchronous>::IS_ASYNC && !<Self::Failure as Fails>::FALLIBLE {
35            this.into.extend(CollectIterator { inner: this.inner });
36            return EffectResult::Item(std::mem::take(this.into));
37        }
38
39        loop {
40            match this.inner.as_mut().poll_effect(cx) {
41                EffectResult::Item(x) => this.into.extend(Some(x)),
42                EffectResult::Failure(x) => return EffectResult::Failure(x),
43                EffectResult::Done(Multiple) => {
44                    return EffectResult::Item(std::mem::take(this.into))
45                }
46                EffectResult::Pending(x) => return EffectResult::Pending(x),
47            }
48        }
49    }
50}
51
52struct CollectIterator<'a, E> {
53    inner: Pin<&'a mut E>,
54}
55
56impl<E> Iterator for CollectIterator<'_, E>
57where
58    E: Effective<Produces = Multiple>,
59{
60    type Item = E::Item;
61
62    fn next(&mut self) -> Option<Self::Item> {
63        match self
64            .inner
65            .as_mut()
66            .poll_effect(&mut Context::from_waker(noop_waker_ref()))
67        {
68            EffectResult::Item(x) => Some(x),
69            EffectResult::Failure(_) => unreachable!("FALLIBLE is false"),
70            EffectResult::Done(Multiple) => None,
71            EffectResult::Pending(_) => unreachable!("IS_ASYNC is false"),
72        }
73    }
74
75    fn size_hint(&self) -> (usize, Option<usize>) {
76        self.inner.size_hint()
77    }
78}
79
80impl<E, C> std::future::Future for Collect<E, C>
81where
82    E: Effective<Produces = Multiple, Async = Async, Failure = Infallible>,
83    C: Default + Extend<E::Item>,
84{
85    type Output = C;
86
87    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
88        match self.poll_effect(cx) {
89            EffectResult::Item(value) => std::task::Poll::Ready(value),
90            EffectResult::Failure(_) => unreachable!(),
91            EffectResult::Done(_) => unreachable!(),
92            EffectResult::Pending(_) => std::task::Poll::Pending,
93        }
94    }
95}