1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use super::maybe_done::MaybeDone;
use finchers_core::endpoint::{Context, Endpoint, IntoEndpoint};
use finchers_core::task::{self, Task};
use finchers_core::{Error, Poll, PollResult};
use std::{fmt, mem};
pub fn all<I>(iter: I) -> All<<I::Item as IntoEndpoint>::Endpoint>
where
I: IntoIterator,
I::Item: IntoEndpoint,
<I::Item as IntoEndpoint>::Output: Send,
{
All {
inner: iter.into_iter().map(IntoEndpoint::into_endpoint).collect(),
}
}
#[allow(missing_docs)]
#[derive(Clone, Debug)]
pub struct All<E> {
inner: Vec<E>,
}
impl<E> Endpoint for All<E>
where
E: Endpoint,
E::Output: Send,
{
type Output = Vec<E::Output>;
type Task = AllTask<E::Task>;
fn apply(&self, cx: &mut Context) -> Option<Self::Task> {
let mut elems = Vec::with_capacity(self.inner.len());
for e in &self.inner {
let f = e.apply(cx)?;
elems.push(MaybeDone::Pending(f));
}
Some(AllTask { elems })
}
}
#[allow(missing_docs)]
pub struct AllTask<T: Task> {
elems: Vec<MaybeDone<T>>,
}
impl<T> fmt::Debug for AllTask<T>
where
T: Task + fmt::Debug,
T::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AllTask").field("elems", &self.elems).finish()
}
}
impl<T> Task for AllTask<T>
where
T: Task,
T::Output: Send,
{
type Output = Vec<T::Output>;
fn poll_task(&mut self, cx: &mut task::Context) -> PollResult<Self::Output, Error> {
let mut all_done = true;
for i in 0..self.elems.len() {
match self.elems[i].poll_done(cx) {
Ok(done) => all_done = all_done & done,
Err(e) => {
self.elems = Vec::new();
return Poll::Ready(Err(e));
}
}
}
if all_done {
let elems: Vec<T::Output> = mem::replace(&mut self.elems, Vec::new())
.into_iter()
.map(|mut m| m.take_item())
.collect();
Into::into(Ok(elems))
} else {
Poll::Pending
}
}
}