tower_web/extract/
bytes.rs1use codegen::CallSite;
2use extract::{Context, Error, Extract, ExtractFuture};
3use util::buf_stream::{self, BufStream};
4
5use futures::{Future, Poll};
6
7#[derive(Debug)]
9pub struct ExtractBytes<T, B> {
10 state: State<T, B>,
11}
12
13#[derive(Debug)]
14enum State<T, B> {
15 Complete(Result<T, Option<Error>>),
16 Body(buf_stream::Collect<B, Vec<u8>>),
17}
18
19impl<B: BufStream> Extract<B> for Vec<u8> {
20 type Future = ExtractBytes<Self, B>;
21
22 fn extract(ctx: &Context) -> Self::Future {
23 use codegen::Source::*;
24
25 match ctx.callsite().source() {
26 Capture(idx) => {
27 let path = ctx.request().uri().path();
28 let value = ctx.captures().get(*idx, path)
29 .into();
30
31 let state = State::Complete(Ok(value));
32 ExtractBytes { state }
33 }
34 Header(header_name) => {
35 let value = match ctx.request().headers().get(header_name) {
36 Some(value) => value,
37 None => {
38 return ExtractBytes::err(Error::missing_argument());
39 }
40 };
41
42 ExtractBytes::ok(value.as_bytes().into())
43 }
44 QueryString => {
45 let query = ctx.request().uri()
46 .path_and_query()
47 .and_then(|path_and_query| path_and_query.query())
48 .unwrap_or("");
49
50 ExtractBytes::ok(query.into())
51 }
52 Body => {
53 panic!("called `extract` but `body` is required");
54 }
55 Unknown => {
56 unimplemented!();
57 }
58 }
59 }
60
61 fn extract_body(ctx: &Context, body: B) -> Self::Future {
62 use codegen::Source::*;
63
64 match ctx.callsite().source() {
65 Body => {
66 let state = State::Body(body.collect());
67 ExtractBytes { state }
68 }
69 _ => panic!("called `extract_body` but not extracting from body"),
70 }
71 }
72
73 fn requires_body(callsite: &CallSite) -> bool {
74 callsite.requires_body()
75 }
76}
77
78impl<T, B> ExtractBytes<T, B> {
79 fn ok(value: T) -> Self {
81 let state = State::Complete(Ok(value));
82 ExtractBytes { state }
83 }
84
85 fn err(err: Error) -> Self {
87 let state = State::Complete(Err(Some(err)));
88 ExtractBytes { state }
89 }
90}
91
92impl<T, B> ExtractFuture for ExtractBytes<T, B>
93where T: From<Vec<u8>>,
94 B: BufStream,
95{
96 type Item = T;
97
98 fn poll(&mut self) -> Poll<(), Error> {
99 use self::State::*;
100
101 loop {
102 let res = match self.state {
103 Complete(Err(ref mut e)) => {
104 return Err(e.take().unwrap());
105 }
106 Complete(Ok(_)) => {
107 return Ok(().into());
108 }
109 Body(ref mut collect) => {
110 let res = collect.poll()
111 .map_err(|_| Error::internal_error());
113
114 try_ready!(res).into()
115 }
116 };
117
118 self.state = State::Complete(Ok(res));
119 }
120 }
121
122 fn extract(self) -> T {
123 use self::State::Complete;
124
125 match self.state {
126 Complete(Ok(res)) => res,
127 _ => panic!("invalid state"),
128 }
129 }
130}