finchers_ext/
map_async.rs

1#![allow(missing_docs)]
2
3use finchers_core::endpoint::{Context, Endpoint};
4use finchers_core::task::{self, IntoTask, Task};
5use finchers_core::{Error, Poll, PollResult};
6use std::mem;
7
8pub fn new<E, F, R>(endpoint: E, f: F) -> MapAsync<E, F>
9where
10    E: Endpoint,
11    F: FnOnce(E::Output) -> R + Clone + Send + Sync,
12    R: IntoTask,
13    R::Task: Send,
14{
15    MapAsync { endpoint, f }
16}
17
18#[derive(Copy, Clone, Debug)]
19pub struct MapAsync<E, F> {
20    endpoint: E,
21    f: F,
22}
23
24impl<E, F, R> Endpoint for MapAsync<E, F>
25where
26    E: Endpoint,
27    F: FnOnce(E::Output) -> R + Clone + Send + Sync,
28    R: IntoTask,
29    R::Task: Send,
30{
31    type Output = R::Output;
32    type Task = MapAsyncTask<E::Task, F, R>;
33
34    fn apply(&self, cx: &mut Context) -> Option<Self::Task> {
35        let task = self.endpoint.apply(cx)?;
36        Some(MapAsyncTask::First(task, self.f.clone()))
37    }
38}
39
40#[derive(Debug)]
41pub enum MapAsyncTask<T, F, R>
42where
43    T: Task,
44    F: FnOnce(T::Output) -> R + Send,
45    R: IntoTask,
46    R::Task: Send,
47{
48    First(T, F),
49    Second(R::Task),
50    Done,
51}
52
53impl<T, F, R> Task for MapAsyncTask<T, F, R>
54where
55    T: Task,
56    F: FnOnce(T::Output) -> R + Send,
57    R: IntoTask,
58    R::Task: Send,
59{
60    type Output = R::Output;
61
62    fn poll_task(&mut self, cx: &mut task::Context) -> PollResult<Self::Output, Error> {
63        use self::MapAsyncTask::*;
64        loop {
65            // TODO: optimize
66            match mem::replace(self, Done) {
67                First(mut task, f) => match task.poll_task(cx) {
68                    Poll::Pending => {
69                        *self = First(task, f);
70                        return Poll::Pending;
71                    }
72                    Poll::Ready(Ok(r)) => {
73                        cx.input().enter_scope(|| {
74                            *self = Second(f(r).into_task());
75                        });
76                        continue;
77                    }
78                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
79                },
80                Second(mut fut) => {
81                    return match fut.poll_task(cx) {
82                        Poll::Pending => {
83                            *self = Second(fut);
84                            Poll::Pending
85                        }
86                        polled => polled,
87                    }
88                }
89                Done => panic!(),
90            }
91        }
92    }
93}