finchers_ext/
map_async.rs1#![allow(missing_docs)]
2
3use finchers_core::endpoint::{Context, Endpoint};
4use finchers_core::task::{self, IntoTask, Task};
5use finchers_core::{Error, Poll, PollResult};
6use std::mem;
7
8pub fn new<E, F, R>(endpoint: E, f: F) -> MapAsync<E, F>
9where
10 E: Endpoint,
11 F: FnOnce(E::Output) -> R + Clone + Send + Sync,
12 R: IntoTask,
13 R::Task: Send,
14{
15 MapAsync { endpoint, f }
16}
17
18#[derive(Copy, Clone, Debug)]
19pub struct MapAsync<E, F> {
20 endpoint: E,
21 f: F,
22}
23
24impl<E, F, R> Endpoint for MapAsync<E, F>
25where
26 E: Endpoint,
27 F: FnOnce(E::Output) -> R + Clone + Send + Sync,
28 R: IntoTask,
29 R::Task: Send,
30{
31 type Output = R::Output;
32 type Task = MapAsyncTask<E::Task, F, R>;
33
34 fn apply(&self, cx: &mut Context) -> Option<Self::Task> {
35 let task = self.endpoint.apply(cx)?;
36 Some(MapAsyncTask::First(task, self.f.clone()))
37 }
38}
39
40#[derive(Debug)]
41pub enum MapAsyncTask<T, F, R>
42where
43 T: Task,
44 F: FnOnce(T::Output) -> R + Send,
45 R: IntoTask,
46 R::Task: Send,
47{
48 First(T, F),
49 Second(R::Task),
50 Done,
51}
52
53impl<T, F, R> Task for MapAsyncTask<T, F, R>
54where
55 T: Task,
56 F: FnOnce(T::Output) -> R + Send,
57 R: IntoTask,
58 R::Task: Send,
59{
60 type Output = R::Output;
61
62 fn poll_task(&mut self, cx: &mut task::Context) -> PollResult<Self::Output, Error> {
63 use self::MapAsyncTask::*;
64 loop {
65 match mem::replace(self, Done) {
67 First(mut task, f) => match task.poll_task(cx) {
68 Poll::Pending => {
69 *self = First(task, f);
70 return Poll::Pending;
71 }
72 Poll::Ready(Ok(r)) => {
73 cx.input().enter_scope(|| {
74 *self = Second(f(r).into_task());
75 });
76 continue;
77 }
78 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
79 },
80 Second(mut fut) => {
81 return match fut.poll_task(cx) {
82 Poll::Pending => {
83 *self = Second(fut);
84 Poll::Pending
85 }
86 polled => polled,
87 }
88 }
89 Done => panic!(),
90 }
91 }
92 }
93}