1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
//! Components for constructing asynchronous computations which will be returned from `Endpoint`s.
//!
//! The main trait in this module is `Task`.
//! This trait is an abstraction of asynchronous computations which will be returned from endpoints.
//! The role of this trait is very close to `futures` and hence its design intentionally resembles
//! `Future`. However, some differences are exist for specializing to the purpose of HTTP handling.
//!
//! This trait does not provide any combinators for composing complicate computations.
//! Such combinations are usually performed indirectly by the endpoints or by wrapping the value of
//! `Future`.

use either::Either;
use futures::{Async, Future, IntoFuture};

use error::Error;
use input::{Input, RequestBody};
use never::Never;
use poll::{Poll, PollResult};

/// The contextual information during polling an task.
#[derive(Debug)]
pub struct Context<'a> {
    input: &'a Input,
    body: &'a mut Option<RequestBody>,
    // FIXME: add `futures::task::Context`
}

impl<'a> Context<'a> {
    /// Create an instance of `Context` from components.
    #[inline]
    pub fn new(input: &'a Input, body: &'a mut Option<RequestBody>) -> Context<'a> {
        Context { input, body }
    }

    /// Return the reference to `Input` at the current request.
    #[inline]
    pub fn input(&self) -> &Input {
        self.input
    }

    /// Take the instance of `RequestBody` at the current request if available.
    #[inline]
    pub fn body(&mut self) -> Option<RequestBody> {
        self.body.take()
    }
}

/// Trait representing the asynchronous computation after applying the endpoints.
///
/// See the module level documentation for details.
pub trait Task: Send {
    /// The *inner* type of an output which will be returned from this task.
    type Output;

    /// Perform polling this task and get its result.
    fn poll_task(&mut self, cx: &mut Context) -> PollResult<Self::Output, Error>;
}

impl<L, R> Task for Either<L, R>
where
    L: Task,
    R: Task<Output = L::Output>,
{
    type Output = L::Output;

    #[inline(always)]
    fn poll_task(&mut self, cx: &mut Context) -> PollResult<Self::Output, Error> {
        match *self {
            Either::Left(ref mut t) => t.poll_task(cx),
            Either::Right(ref mut t) => t.poll_task(cx),
        }
    }
}

/// Trait representing the conversion to a `Task`.
pub trait IntoTask {
    /// The type of *output* value.
    type Output;

    /// The type of value to be converted.
    type Task: Task<Output = Self::Output>;

    /// Perform conversion itself into a `Task`.
    fn into_task(self) -> Self::Task;
}

// FIXME: replace the trait bound with `core::ops::Async`
impl<F> IntoTask for F
where
    F: IntoFuture,
    F::Future: Send,
{
    type Output = Result<F::Item, F::Error>;
    type Task = TaskFuture<F::Future>;

    #[inline(always)]
    fn into_task(self) -> Self::Task {
        future(self)
    }
}

/// A helper struct which wraps a `Future` and provides the implementation of `Task`.
#[derive(Debug)]
pub struct TaskFuture<F>(F);

impl<F> From<F> for TaskFuture<F>
where
    F: Future + Send,
{
    fn from(fut: F) -> Self {
        TaskFuture(fut)
    }
}

impl<F> Task for TaskFuture<F>
where
    F: Future + Send,
{
    type Output = Result<F::Item, F::Error>;

    #[inline(always)]
    fn poll_task(&mut self, _: &mut Context) -> PollResult<Self::Output, Error> {
        match Future::poll(&mut self.0) {
            Ok(Async::Ready(ready)) => Poll::Ready(Ok(Ok(ready))),
            Ok(Async::NotReady) => Poll::Pending,
            Err(err) => Poll::Ready(Ok(Err(err))),
        }
    }
}

/// Create a task from a `Future`.
pub fn future<F>(future: F) -> TaskFuture<F::Future>
where
    F: IntoFuture,
    F::Future: Send,
{
    TaskFuture::from(IntoFuture::into_future(future))
}

/// A `Task` which will immediately return a value of `T`.
#[derive(Debug)]
pub struct Ready<T>(Option<T>);

impl<T: Send> From<T> for Ready<T> {
    fn from(val: T) -> Self {
        Ready(Some(val))
    }
}

impl<T: Send> Task for Ready<T> {
    type Output = T;

    #[inline(always)]
    fn poll_task(&mut self, _: &mut Context) -> PollResult<Self::Output, Error> {
        let val = self.0.take().expect("The task cannot resolve twice");
        Poll::Ready(Ok(val))
    }
}

/// Create a task which will immediately return a value of `T`.
pub fn ready<T: Send>(val: T) -> Ready<T> {
    Ready::from(val)
}

/// A `Task` which will immediately abort with an error value of `E`.
#[derive(Debug)]
pub struct Abort<E> {
    cause: Option<E>,
}

impl<E> From<E> for Abort<E>
where
    E: Into<Error> + Send,
{
    fn from(cause: E) -> Self {
        Abort { cause: Some(cause) }
    }
}

impl<E> Task for Abort<E>
where
    E: Into<Error> + Send,
{
    type Output = Never;

    #[inline(always)]
    fn poll_task(&mut self, _: &mut Context) -> PollResult<Self::Output, Error> {
        let cause = self.cause.take().expect("The task cannot reject twice");
        Poll::Ready(Err(Into::into(cause)))
    }
}

/// Create a task which will immediately abort the computation with an error value of `E`.
pub fn abort<E>(cause: E) -> Abort<E>
where
    E: Into<Error> + Send,
{
    Abort::from(cause)
}