tower_web/extract/
bytes.rs

1use codegen::CallSite;
2use extract::{Context, Error, Extract, ExtractFuture};
3use util::buf_stream::{self, BufStream};
4
5use futures::{Future, Poll};
6
7/// Extract a value using `serde`
8#[derive(Debug)]
9pub struct ExtractBytes<T, B> {
10    state: State<T, B>,
11}
12
13#[derive(Debug)]
14enum State<T, B> {
15    Complete(Result<T, Option<Error>>),
16    Body(buf_stream::Collect<B, Vec<u8>>),
17}
18
19impl<B: BufStream> Extract<B> for Vec<u8> {
20    type Future = ExtractBytes<Self, B>;
21
22    fn extract(ctx: &Context) -> Self::Future {
23        use codegen::Source::*;
24
25        match ctx.callsite().source() {
26            Capture(idx) => {
27                let path = ctx.request().uri().path();
28                let value = ctx.captures().get(*idx, path)
29                    .into();
30
31                let state = State::Complete(Ok(value));
32                ExtractBytes { state }
33            }
34            Header(header_name) => {
35                let value = match ctx.request().headers().get(header_name) {
36                    Some(value) => value,
37                    None => {
38                        return ExtractBytes::err(Error::missing_argument());
39                    }
40                };
41
42                ExtractBytes::ok(value.as_bytes().into())
43            }
44            QueryString => {
45                let query = ctx.request().uri()
46                    .path_and_query()
47                    .and_then(|path_and_query| path_and_query.query())
48                    .unwrap_or("");
49
50                ExtractBytes::ok(query.into())
51            }
52            Body => {
53                panic!("called `extract` but `body` is required");
54            }
55            Unknown => {
56                unimplemented!();
57            }
58        }
59    }
60
61    fn extract_body(ctx: &Context, body: B) -> Self::Future {
62        use codegen::Source::*;
63
64        match ctx.callsite().source() {
65            Body => {
66                let state = State::Body(body.collect());
67                ExtractBytes { state }
68            }
69            _ => panic!("called `extract_body` but not extracting from body"),
70        }
71    }
72
73    fn requires_body(callsite: &CallSite) -> bool {
74        callsite.requires_body()
75    }
76}
77
78impl<T, B> ExtractBytes<T, B> {
79    /// Create an `ExtractBytes` in the completed state
80    fn ok(value: T) -> Self {
81        let state = State::Complete(Ok(value));
82        ExtractBytes { state }
83    }
84
85    /// Create an `ExtractBytes` in the error state
86    fn err(err: Error) -> Self {
87        let state = State::Complete(Err(Some(err)));
88        ExtractBytes { state }
89    }
90}
91
92impl<T, B> ExtractFuture for ExtractBytes<T, B>
93where T: From<Vec<u8>>,
94      B: BufStream,
95{
96    type Item = T;
97
98    fn poll(&mut self) -> Poll<(), Error> {
99        use self::State::*;
100
101        loop {
102            let res = match self.state {
103                Complete(Err(ref mut e)) => {
104                    return Err(e.take().unwrap());
105                }
106                Complete(Ok(_)) => {
107                    return Ok(().into());
108                }
109                Body(ref mut collect) => {
110                    let res = collect.poll()
111                        // TODO: Is there a better way to handle errors?
112                        .map_err(|_| Error::internal_error());
113
114                    try_ready!(res).into()
115                }
116            };
117
118            self.state = State::Complete(Ok(res));
119        }
120    }
121
122    fn extract(self) -> T {
123        use self::State::Complete;
124
125        match self.state {
126            Complete(Ok(res)) => res,
127            _ => panic!("invalid state"),
128        }
129    }
130}