multipart_async/server/
mod.rs

1// Copyright 2017 `multipart-async` Crate Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7//! The server-side abstraction for multipart requests. Enabled with the `server` feature (on by
8//! default).
9//!
10//! Use this when you are implementing an HTTP server and want to
11//! to accept, parse, and serve HTTP `multipart/form-data` requests (file uploads).
12//!
13//! See the `Multipart` struct for more info.
14extern crate httparse;
15extern crate twoway;
16
17use futures::{Poll, Stream};
18use futures::task::{self, Task};
19
20use mime::Mime;
21
22use tempdir::TempDir;
23
24use std::borrow::Borrow;
25use std::cell::Cell;
26use std::collections::VecDeque;
27use std::fs::{self, File};
28use std::io::prelude::*;
29use std::path::{Path, PathBuf};
30use std::rc::Rc;
31use std::str::Utf8Error;
32use std::{fmt, io, mem, ptr};
33
34use self::boundary::BoundaryFinder;
35
36macro_rules! try_opt (
37    ($expr:expr) => (
38        match $expr {
39            Some(val) => val,
40            None => return None,
41        }
42    )
43);
44
45macro_rules! ret_err (
46    ($string:expr) => (
47        return ::helpers::error($string);
48    );
49    ($string:expr, $($args:tt)*) => (
50        return ::helpers::error(format!($string, $($args)*));
51    );
52);
53
54mod boundary;
55mod field;
56
57use helpers::*;
58
59use self::field::ReadHeaders;
60
61pub use self::field::{Field, FieldHeaders, FieldData, ReadTextField, TextField};
62
63// FIXME: hyper integration once API is in place
64// #[cfg(feature = "hyper")]
65// mod hyper;
66
67/// The server-side implementation of `multipart/form-data` requests.
68///
69/// This will parse the incoming stream into `Field` instances via its
70/// `Stream` implementation.
71///
72/// To maintain consistency in the underlying stream, this will not yield more than one
73/// `Field` at a time. A `Drop` implementation on `FieldData` is used to signal
74/// when it's time to move forward, so do avoid leaking that type or anything which contains it
75/// (`Field`, `ReadTextField`, or any stream combinators).
76pub struct Multipart<S: Stream> {
77    internal: Rc<Internal<S>>,
78    read_hdr: ReadHeaders
79}
80
81// Q: why can't we just wrap up these bounds into a trait?
82// A: https://github.com/rust-lang/rust/issues/24616#issuecomment-112065997
83// (The workaround mentioned in a later comment doesn't seem to be worth the added complexity)
84impl<S: Stream> Multipart<S> where S::Item: BodyChunk, S::Error: StreamError {
85    /// Construct a new `Multipart` with the given body reader and boundary.
86    ///
87    /// This will add the requisite `--` and CRLF (`\r\n`) to the boundary as per
88    /// [IETF RFC 7578 section 4.1](https://tools.ietf.org/html/rfc7578#section-4.1).
89    pub fn with_body<B: Into<String>>(stream: S, boundary: B) -> Self {
90        let mut boundary = boundary.into();
91        boundary.insert_str(0, "--");
92
93        debug!("Boundary: {}", boundary);
94
95        Multipart { 
96            internal: Rc::new(Internal::new(stream, boundary)),
97            read_hdr: ReadHeaders::default(),
98        }
99    }
100}
101
102impl<S: Stream> Stream for Multipart<S> where S::Item: BodyChunk, S::Error: StreamError {
103    type Item = Field<S>;
104    type Error = S::Error;
105
106    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
107        // FIXME: combine this with the next statement when non-lexical lifetimes are added
108        // shouldn't be an issue anyway because the optimizer can fold these checks together
109        if Rc::get_mut(&mut self.internal).is_none() {
110            self.internal.park_curr_task();
111            return not_ready();
112        }
113
114        // We don't want to return another `Field` unless we have exclusive access.
115        let headers = {
116            let stream = Rc::get_mut(&mut self.internal).unwrap().stream.get_mut();
117
118            match try_ready!(self.read_hdr.read_headers(stream)) {
119                Some(headers) => headers,
120                None => return ready(None),
121            }
122        };
123
124        ready(field::new_field(headers, self.internal.clone()))
125    }
126}
127
128struct Internal<S: Stream> {
129    stream: Cell<BoundaryFinder<S>>,
130    waiting_task: Cell<Option<Task>>,
131}
132
133impl<S: Stream> Internal<S> {
134    fn new(stream: S, boundary: String) -> Self {
135        debug_assert!(boundary.starts_with("--"), "Boundary must start with --");
136
137        Internal {
138            stream: BoundaryFinder::new(stream, boundary).into(),
139            waiting_task: None.into(),
140        }
141    }
142
143    fn park_curr_task(&self) {
144        self.waiting_task.set(Some(task::current()));
145    }
146
147    fn notify_task(&self) {
148        self.waiting_task.take().map(|t| t.notify());
149    }
150}
151
152/// The operations required from a body stream's `Item` type.
153pub trait BodyChunk: Sized {
154    /// Split the chunk at `idx`, returning `(self[..idx], self[idx..])`.
155    fn split_at(self, idx: usize) -> (Self, Self);
156
157    /// Get the slice representing the data of this chunk.
158    fn as_slice(&self) -> &[u8];
159
160    /// Equivalent to `self.as_slice().len()`
161    #[inline(always)]
162    fn len(&self) -> usize {
163        self.as_slice().len()
164    }
165
166    /// Equivalent to `self.as_slice().is_empty()`
167    #[inline(always)]
168    fn is_empty(&self) -> bool {
169        self.as_slice().is_empty()
170    }
171
172    /// Equivalent to `self.as_slice().to_owned()`
173    ///
174    /// Implementors are welcome to override this if they can provide a cheaper conversion.
175    #[inline(always)]
176    fn into_vec(self) -> Vec<u8> {
177        self.as_slice().to_owned()
178    }
179}
180
181impl BodyChunk for Vec<u8> {
182    fn split_at(mut self, idx: usize) -> (Self, Self) {
183        let other = self.split_off(idx);
184        (self, other)
185    }
186
187    fn as_slice(&self) -> &[u8] {
188        self
189    }
190
191    fn into_vec(self) -> Vec<u8> { self }
192}
193
194impl<'a> BodyChunk for &'a [u8] {
195    fn split_at(self, idx: usize) -> (Self, Self) {
196        self.split_at(idx)
197    }
198
199    fn as_slice(&self) -> &[u8] {
200        self
201    }
202}
203
204/// The operations required from a body stream's `Error` type.
205pub trait StreamError: From<io::Error> {
206    /// Wrap a static string into this error type.
207    ///
208    /// Goes through `io::Error` by default.
209    fn from_str(str: &'static str) -> Self {
210        io::Error::new(io::ErrorKind::InvalidData, str).into()
211    }
212
213    /// Wrap a dynamic string into this error type.
214    ///
215    /// Goes through `io::Error` by default.
216    fn from_string(string: String) -> Self {
217        io::Error::new(io::ErrorKind::InvalidData, string).into()
218    }
219
220    /// Wrap a `std::str::Utf8Error` into this error type.
221    ///
222    /// Goes through `io::Error` by default.
223    fn from_utf8(err: Utf8Error) -> Self {
224        io::Error::new(io::ErrorKind::InvalidData, err).into()
225    }
226}
227
228impl<E> StreamError for E where E: From<io::Error> {}
229
230//impl StreamError for String {
231//    fn from_string(string: String) -> Self { string }
232//}