multipart_async/server/
mod.rs1extern crate httparse;
15extern crate twoway;
16
17use futures::{Poll, Stream};
18use futures::task::{self, Task};
19
20use mime::Mime;
21
22use tempdir::TempDir;
23
24use std::borrow::Borrow;
25use std::cell::Cell;
26use std::collections::VecDeque;
27use std::fs::{self, File};
28use std::io::prelude::*;
29use std::path::{Path, PathBuf};
30use std::rc::Rc;
31use std::str::Utf8Error;
32use std::{fmt, io, mem, ptr};
33
34use self::boundary::BoundaryFinder;
35
36macro_rules! try_opt (
37 ($expr:expr) => (
38 match $expr {
39 Some(val) => val,
40 None => return None,
41 }
42 )
43);
44
45macro_rules! ret_err (
46 ($string:expr) => (
47 return ::helpers::error($string);
48 );
49 ($string:expr, $($args:tt)*) => (
50 return ::helpers::error(format!($string, $($args)*));
51 );
52);
53
54mod boundary;
55mod field;
56
57use helpers::*;
58
59use self::field::ReadHeaders;
60
61pub use self::field::{Field, FieldHeaders, FieldData, ReadTextField, TextField};
62
63pub struct Multipart<S: Stream> {
77 internal: Rc<Internal<S>>,
78 read_hdr: ReadHeaders
79}
80
81impl<S: Stream> Multipart<S> where S::Item: BodyChunk, S::Error: StreamError {
85 pub fn with_body<B: Into<String>>(stream: S, boundary: B) -> Self {
90 let mut boundary = boundary.into();
91 boundary.insert_str(0, "--");
92
93 debug!("Boundary: {}", boundary);
94
95 Multipart {
96 internal: Rc::new(Internal::new(stream, boundary)),
97 read_hdr: ReadHeaders::default(),
98 }
99 }
100}
101
102impl<S: Stream> Stream for Multipart<S> where S::Item: BodyChunk, S::Error: StreamError {
103 type Item = Field<S>;
104 type Error = S::Error;
105
106 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
107 if Rc::get_mut(&mut self.internal).is_none() {
110 self.internal.park_curr_task();
111 return not_ready();
112 }
113
114 let headers = {
116 let stream = Rc::get_mut(&mut self.internal).unwrap().stream.get_mut();
117
118 match try_ready!(self.read_hdr.read_headers(stream)) {
119 Some(headers) => headers,
120 None => return ready(None),
121 }
122 };
123
124 ready(field::new_field(headers, self.internal.clone()))
125 }
126}
127
128struct Internal<S: Stream> {
129 stream: Cell<BoundaryFinder<S>>,
130 waiting_task: Cell<Option<Task>>,
131}
132
133impl<S: Stream> Internal<S> {
134 fn new(stream: S, boundary: String) -> Self {
135 debug_assert!(boundary.starts_with("--"), "Boundary must start with --");
136
137 Internal {
138 stream: BoundaryFinder::new(stream, boundary).into(),
139 waiting_task: None.into(),
140 }
141 }
142
143 fn park_curr_task(&self) {
144 self.waiting_task.set(Some(task::current()));
145 }
146
147 fn notify_task(&self) {
148 self.waiting_task.take().map(|t| t.notify());
149 }
150}
151
152pub trait BodyChunk: Sized {
154 fn split_at(self, idx: usize) -> (Self, Self);
156
157 fn as_slice(&self) -> &[u8];
159
160 #[inline(always)]
162 fn len(&self) -> usize {
163 self.as_slice().len()
164 }
165
166 #[inline(always)]
168 fn is_empty(&self) -> bool {
169 self.as_slice().is_empty()
170 }
171
172 #[inline(always)]
176 fn into_vec(self) -> Vec<u8> {
177 self.as_slice().to_owned()
178 }
179}
180
181impl BodyChunk for Vec<u8> {
182 fn split_at(mut self, idx: usize) -> (Self, Self) {
183 let other = self.split_off(idx);
184 (self, other)
185 }
186
187 fn as_slice(&self) -> &[u8] {
188 self
189 }
190
191 fn into_vec(self) -> Vec<u8> { self }
192}
193
194impl<'a> BodyChunk for &'a [u8] {
195 fn split_at(self, idx: usize) -> (Self, Self) {
196 self.split_at(idx)
197 }
198
199 fn as_slice(&self) -> &[u8] {
200 self
201 }
202}
203
204pub trait StreamError: From<io::Error> {
206 fn from_str(str: &'static str) -> Self {
210 io::Error::new(io::ErrorKind::InvalidData, str).into()
211 }
212
213 fn from_string(string: String) -> Self {
217 io::Error::new(io::ErrorKind::InvalidData, string).into()
218 }
219
220 fn from_utf8(err: Utf8Error) -> Self {
224 io::Error::new(io::ErrorKind::InvalidData, err).into()
225 }
226}
227
228impl<E> StreamError for E where E: From<io::Error> {}
229
230