multipart_async/server/field/
collect.rs

1use futures::{Future, Stream};
2use futures::Async::*;
3
4use std::rc::Rc;
5use std::str::Utf8Error;
6use std::{fmt, str};
7
8
9use server::boundary::BoundaryFinder;
10use server::{Internal, BodyChunk, StreamError};
11
12use super::{FieldHeaders, FieldData};
13
14use helpers::*;
15
16/// The result of reading a `Field` to text.
17#[derive(Clone, Debug)]
18pub struct TextField {
19    /// The headers for the original field, provided as a convenience.
20    pub headers: Rc<FieldHeaders>,
21    /// The text of the field.
22    pub text: String,
23}
24
25/// A `Future` which attempts to read a field's data to a string.
26///
27/// ### Charset
28/// For simplicity, the default UTF-8 character set is assumed, as defined in
29/// [IETF RFC 7578 Section 5.1.2](https://tools.ietf.org/html/rfc7578#section-5.1.2).
30/// If the field body cannot be decoded as UTF-8, an error is returned.
31///
32/// Decoding text in a different charset (except ASCII which is compatible with UTF-8) is,
33/// currently, beyond the scope of this crate. However, as a convention, web browsers will send
34/// `multipart/form-data` requests in the same charset as that of the document (page or frame)
35/// containing the form, so if you only serve ASCII/UTF-8 pages then you won't have to worry
36/// too much about decoding strange charsets.
37///
38/// ### Warning About Leaks
39/// If this value or the contained `FieldData` is leaked (via `mem::forget()` or some
40/// other mechanism), then the parent `Multipart` will never be able to yield the next field in the
41/// stream. The task waiting on the `Multipart` will also never be notified, which, depending on the
42/// event loop/reactor/executor implementation, may cause a deadlock.
43#[derive(Default)]
44pub struct ReadTextField<S: Stream> {
45    data: Option<FieldData<S>>,
46    accum: String,
47    /// The headers for the original field, provided as a convenience.
48    pub headers: Rc<FieldHeaders>,
49    /// The limit for the string, in bytes, to avoid potential DoS attacks from
50    /// attackers running the server out of memory. If an incoming chunk is expected to push the
51    /// string over this limit, an error is returned and the offending chunk is pushed back
52    /// to the head of the stream.
53    pub limit: usize,
54}
55
56// RFC on these numbers, they're pretty much arbitrary
57const DEFAULT_LIMIT: usize = 65536; // 65KiB--reasonable enough for one field, right?
58const MAX_LIMIT: usize = 16_777_216; // 16MiB--highest sane value for one field, IMO
59
60pub fn read_text<S: Stream>(data: FieldData<S>) -> ReadTextField<S> {
61    ReadTextField {
62        headers: data.headers.clone(), data: Some(data), limit: DEFAULT_LIMIT, accum: String::new()
63    }
64}
65
66impl<S: Stream> ReadTextField<S> {
67    /// Set the length limit, in bytes, for the collected text. If an incoming chunk is expected to
68    /// push the string over this limit, an error is returned and the offending chunk is pushed back
69    /// to the head of the stream.
70    ///
71    /// Setting a value higher than a few megabytes is not recommended as it could allow an attacker
72    /// to DoS the server by running it out of memory, causing it to panic on allocation or spend
73    /// forever swapping pagefiles to disk. Remember that this limit is only for a single field
74    /// as well.
75    ///
76    /// Setting this to `usize::MAX` is equivalent to removing the limit as the string
77    /// would overflow its capacity value anyway.
78    pub fn limit(self, limit: usize) -> Self {
79        Self { limit, .. self}
80    }
81
82    /// Soft max limit if the default isn't large enough.
83    ///
84    /// Going higher than this is allowed, but not recommended.
85    pub fn limit_max(self) -> Self {
86        self.limit(MAX_LIMIT)
87    }
88
89    /// Take the text that has been collected so far, leaving an empty string in its place.
90    ///
91    /// If the length limit was hit, this allows the field to continue being read.
92    pub fn take_string(&mut self) -> String {
93        replace_default(&mut self.accum)
94    }
95
96    /// The text that has been collected so far.
97    pub fn ref_text(&self) -> &str {
98        &self.accum
99    }
100
101    /// Destructure this future, taking the internal `FieldData` instance back.
102    ///
103    /// Will be `None` if the field was read to completion, because the internal `FieldData`
104    /// instance is dropped afterwards to allow the parent `Multipart` to immediately start
105    /// working on the next field.
106    pub fn into_data(self) -> Option<FieldData<S>> {
107        self.data
108    }
109}
110
111impl<S: Stream> Future for ReadTextField<S> where S::Item: BodyChunk, S::Error: StreamError {
112    type Item = TextField;
113    type Error = S::Error;
114
115    fn poll(&mut self) -> Poll<Self::Item, S::Error> {
116        loop {
117            let data = match self.data {
118                Some(ref mut data) => data,
119                None => return not_ready(),
120            };
121
122            let mut stream = data.stream_mut();
123
124            let chunk = match try_ready!(stream.body_chunk()) {
125                Some(val) => val,
126                _ => break,
127            };
128
129            // This also catches capacity overflows
130            if self.accum.len().saturating_add(chunk.len()) > self.limit {
131                stream.push_chunk(chunk);
132                ret_err!("Text field {:?} exceeded limit of {} bytes", self.headers, self.limit);
133            }
134
135            // Try to convert the chunk to UTF-8 and append it to the accumulator
136            let split_idx = match str::from_utf8(chunk.as_slice()) {
137                Ok(s) => { self.accum.push_str(s); continue },
138                Err(e) => if should_continue(&e, chunk.as_slice()) {
139                    // this error just means that there was a byte sequence cut off by a
140                    // chunk boundary
141                    e.valid_up_to()
142                } else {
143                    // otherwise, there was an invalid byte sequence
144                    return utf8_err(e);
145                },
146            };
147
148            let (valid, invalid) = chunk.split_at(split_idx);
149
150            self.accum.push_str(str::from_utf8(valid.as_slice())
151                .expect("a `StreamChunk` was UTF-8 before, now it's not"));
152
153            // Recombine the cutoff UTF-8 sequence
154            let needed_len = utf8_char_width(invalid.as_slice()[0]) - invalid.len();
155
156            // Get a second chunk or push the first chunk back
157            let (first, second) = match try_ready!(stream.another_chunk(invalid)) {
158                Some(pair) => pair,
159                None => ret_err!("unexpected end of stream while decoding a UTF-8 sequence"),
160            };
161
162            if second.len() < needed_len {
163                ret_err!("got a chunk smaller than the {} byte(s) needed to finish \
164                          decoding this UTF-8 sequence: {:?}",
165                         needed_len, first.as_slice());
166            }
167
168            if self.accum.len().saturating_add(first.len()).saturating_add(second.len()) > self.limit {
169                // push chunks in reverse order
170                stream.push_chunk(second);
171                stream.push_chunk(first);
172                ret_err!("Text field {:?} exceeded limit of {} bytes", self.headers, self.limit);
173            }
174
175            let mut buf = [0u8; 4];
176
177            // first.len() will be between 1 and 4 as guaranteed by `FromUtf8Error::valid_up_to()`
178            buf[..first.len()].copy_from_slice(first.as_slice());
179            buf[first.len()..].copy_from_slice(&second.as_slice()[.. needed_len]);
180
181            let split_idx = match str::from_utf8(&buf) {
182                Ok(s) => { self.accum.push_str(s); needed_len },
183                Err(e) => if should_continue(&e, &buf) {
184                    e.valid_up_to()
185                } else {
186                    return utf8_err(e);
187                }
188            };
189
190            let (_, rem) = second.split_at(split_idx);
191
192            if !rem.is_empty() {
193                stream.push_chunk(rem);
194            }
195        }
196
197        // Optimization: free the `FieldData` so the parent `Multipart` can yield
198        // the next field.
199        self.data = None;
200
201        ready(TextField {
202            headers: self.headers.clone(),
203            text: self.take_string(),
204        })
205    }
206}
207
208impl<S: Stream> fmt::Debug for ReadTextField<S> {
209    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
210        f.debug_struct("ReadFieldText")
211            .field("accum", &self.accum)
212            .field("headers", &self.headers)
213            .field("limit", &self.limit)
214            .finish()
215    }
216}
217
218// Below lifted from https://github.com/rust-lang/rust/blob/1.19.0/src/libcore/str/mod.rs#L1461-L1485
219// because they're being selfish with their UTF-8 implementation internals
220static UTF8_CHAR_WIDTH: [u8; 256] = [
221    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
222    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, // 0x1F
223    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
224    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, // 0x3F
225    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
226    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, // 0x5F
227    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
228    1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, // 0x7F
229    0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
230    0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 0x9F
231    0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
232    0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, // 0xBF
233    0,0,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
234    2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, // 0xDF
235    3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3, // 0xEF
236    4,4,4,4,4,0,0,0,0,0,0,0,0,0,0,0, // 0xFF
237];
238
239#[inline]
240fn utf8_char_width(b: u8) -> usize {
241    return UTF8_CHAR_WIDTH[b as usize] as usize;
242}
243
244/// Replacement for the reasoning using `Utf8Error::error_len()` which was stabilized in 1.20.
245fn should_continue(err: &Utf8Error, buf: &[u8]) -> bool {
246    let valid_len = err.valid_up_to();
247
248    // If the first byte of the sequence is a valid first byte
249    utf8_char_width(buf[valid_len]) > 1 && // AND
250        (
251            // If the first byte of the sequence is the only invalid byte
252            valid_len + 1 == buf.len() || // OR
253            // If all the subsequent bytes are valid continuation bytes
254            buf[valid_len + 1 ..].iter().all(|&b| b >= 0x80 && b <= 0xBF)
255        )
256}