Skip to main content

http_multipart/
lib.rs

1#![forbid(unsafe_code)]
2
3mod content_disposition;
4mod error;
5mod field;
6mod form;
7mod header;
8
9pub use self::{
10    error::MultipartError,
11    field::Field,
12    form::{Form, Part},
13};
14
15use core::{future::poll_fn, pin::Pin};
16
17use bytes::{Buf, BytesMut};
18use field::FieldDecoder;
19use futures_core::stream::Stream;
20use http::{header::HeaderMap, Method, Request};
21use memchr::memmem;
22use pin_project_lite::pin_project;
23
24use self::{content_disposition::ContentDisposition, error::PayloadError};
25
26/// Multipart protocol using high level API that operate over [Stream] trait.
27///
28/// `http` crate is used as Http request input. It provides necessary header information needed for
29/// [Multipart].
30///
31/// # Examples:
32/// ```rust
33/// use std::{convert::Infallible, error, pin::pin};
34///
35/// use futures_core::stream::Stream;
36/// use http::Request;
37///
38/// async fn handle<B>(req: Request<B>) -> Result<(), Box<dyn error::Error + Send + Sync>>
39/// where
40///     B: Stream<Item = Result<Vec<u8>, Infallible>>
41/// {
42///     // destruct request type.
43///     let (parts, body) = req.into_parts();
44///     let req = Request::from_parts(parts, ());
45///
46///     // prepare multipart handling.
47///     let mut multipart = http_multipart::multipart(&req, body)?;
48///
49///     // pin multipart and start await on the request body.
50///     let mut multipart = pin!(multipart);
51///
52///     // try async iterate through fields of the multipart.
53///     while let Some(mut field) = multipart.try_next().await? {
54///         // try async iterate through single field's bytes data.
55///         while let Some(chunk) = field.try_next().await? {
56///             // handle bytes data.
57///         }
58///     }
59///
60///     Ok(())
61/// }
62/// ```
63pub fn multipart<Ext, B, T, E>(req: &Request<Ext>, body: B) -> Result<Multipart<B>, MultipartError>
64where
65    B: Stream<Item = Result<T, E>>,
66    T: AsRef<[u8]>,
67    E: Into<PayloadError>,
68{
69    multipart_with_config(req, body, Config::default())
70}
71
72/// [multipart] with [Config] that used for customize behavior of [Multipart].
73pub fn multipart_with_config<Ext, B, T, E>(
74    req: &Request<Ext>,
75    body: B,
76    config: Config,
77) -> Result<Multipart<B>, MultipartError>
78where
79    B: Stream<Item = Result<T, E>>,
80    T: AsRef<[u8]>,
81    E: Into<PayloadError>,
82{
83    if req.method() != Method::POST {
84        return Err(MultipartError::NoPostMethod);
85    }
86
87    let boundary = header::boundary(req.headers())?;
88
89    Ok(Multipart {
90        stream: body,
91        buf: BytesMut::new(),
92        boundary: boundary.into(),
93        headers: HeaderMap::new(),
94        pending_field: false,
95        config,
96    })
97}
98
99/// Configuration for [Multipart] type
100#[derive(Debug, Copy, Clone)]
101pub struct Config {
102    /// limit the max size of internal buffer.
103    /// internal buffer is used to cache overlapped chunks around boundary and filed headers.
104    /// Default to 1MB
105    pub buf_limit: usize,
106}
107
108impl Default for Config {
109    fn default() -> Self {
110        Self { buf_limit: 1024 * 1024 }
111    }
112}
113
114pin_project! {
115    pub struct Multipart<S> {
116        #[pin]
117        stream: S,
118        buf: BytesMut,
119        boundary: Box<[u8]>,
120        headers: HeaderMap,
121        pending_field: bool,
122        config: Config
123    }
124}
125
126const DOUBLE_HYPHEN: &[u8; 2] = b"--";
127const LF: &[u8; 1] = b"\n";
128const DOUBLE_CR_LF: &[u8; 4] = b"\r\n\r\n";
129const FIELD_DELIMITER: &[u8; 4] = b"\r\n--";
130
131impl<S, T, E> Multipart<S>
132where
133    S: Stream<Item = Result<T, E>>,
134    T: AsRef<[u8]>,
135    E: Into<PayloadError>,
136{
137    // take in &mut Pin<&mut Self> so Field can borrow it as Pin<&mut Multipart>.
138    // this avoid another explicit stack pin when operating on Field type.
139    pub async fn try_next<'s>(self: &'s mut Pin<&mut Self>) -> Result<Option<Field<'s, S>>, MultipartError> {
140        let boundary_len = self.boundary.len();
141
142        if self.pending_field {
143            self.as_mut().consume_pending_field().await?;
144        }
145
146        loop {
147            let this = self.as_mut().project();
148            if let Some(idx) = memmem::find(this.buf, LF) {
149                // backtrack one byte to exclude CR
150                let slice = match idx.checked_sub(1) {
151                    Some(idx) => &this.buf[..idx],
152                    // no CR before LF.
153                    None => return Err(MultipartError::Boundary),
154                };
155
156                match slice.len() {
157                    // empty line. skip.
158                    0 => {
159                        // forward one byte to include LF and remove the empty line.
160                        this.buf.advance(idx + 1);
161                        continue;
162                    }
163                    // not enough data to operate.
164                    len if len < (boundary_len + 2) => {}
165                    // not boundary.
166                    _ if &slice[..2] != DOUBLE_HYPHEN => return Err(MultipartError::Boundary),
167                    // non last boundary
168                    _ if this.boundary.as_ref().eq(&slice[2..]) => {
169                        // forward one byte to include CRLF and remove the boundary line.
170                        this.buf.advance(idx + 1);
171
172                        let field = self.as_mut().parse_field().await?;
173                        return Ok(Some(field));
174                    }
175                    // last boundary.
176                    len if len == (boundary_len + 4) => {
177                        let at = boundary_len + 2;
178                        // TODO: add log for ill formed ending boundary?;
179                        let _ = this.boundary.as_ref().eq(&slice[2..at]) && &slice[at..] == DOUBLE_HYPHEN;
180                        return Ok(None);
181                    }
182                    // boundary line exceed expected length.
183                    _ => return Err(MultipartError::Boundary),
184                }
185            }
186
187            if self.buf_overflow() {
188                return Err(MultipartError::BufferOverflow);
189            }
190
191            self.as_mut().try_read_stream_to_buf().await?;
192        }
193    }
194
195    async fn parse_field(mut self: Pin<&mut Self>) -> Result<Field<'_, S>, MultipartError> {
196        loop {
197            let this = self.as_mut().project();
198
199            if let Some(idx) = memmem::find(this.buf, DOUBLE_CR_LF) {
200                let slice = &this.buf[..idx + 4];
201
202                header::parse_headers(this.headers, slice)?;
203                this.buf.advance(slice.len());
204
205                let cp = ContentDisposition::try_from_header(this.headers)?;
206
207                header::check_headers(this.headers)?;
208
209                let length = header::content_length_opt(this.headers)?;
210
211                *this.pending_field = true;
212
213                return Ok(Field::new(length, cp, self));
214            }
215
216            if self.buf_overflow() {
217                return Err(MultipartError::Header(httparse::Error::TooManyHeaders));
218            }
219
220            self.as_mut().try_read_stream_to_buf().await?;
221        }
222    }
223
224    #[cold]
225    #[inline(never)]
226    async fn consume_pending_field(mut self: Pin<&mut Self>) -> Result<(), MultipartError> {
227        let mut field_ty = FieldDecoder::default();
228
229        loop {
230            let this = self.as_mut().project();
231            if let Some(idx) = field_ty.try_find_split_idx(this.buf, this.boundary)? {
232                this.buf.advance(idx);
233            }
234            if matches!(field_ty, FieldDecoder::StreamEnd) {
235                *this.pending_field = false;
236                return Ok(());
237            }
238            self.as_mut().try_read_stream_to_buf().await?;
239        }
240    }
241
242    async fn try_read_stream_to_buf(mut self: Pin<&mut Self>) -> Result<(), MultipartError> {
243        let bytes = self.as_mut().try_read_stream().await?;
244        self.project().buf.extend_from_slice(bytes.as_ref());
245        Ok(())
246    }
247
248    async fn try_read_stream(mut self: Pin<&mut Self>) -> Result<T, MultipartError> {
249        match poll_fn(move |cx| self.as_mut().project().stream.poll_next(cx)).await {
250            Some(Ok(bytes)) => Ok(bytes),
251            Some(Err(e)) => Err(MultipartError::Payload(e.into())),
252            None => Err(MultipartError::UnexpectedEof),
253        }
254    }
255
256    pub(crate) fn buf_overflow(&self) -> bool {
257        self.buf.len() > self.config.buf_limit
258    }
259}
260
261#[cfg(test)]
262mod test {
263    use std::{convert::Infallible, pin::pin};
264
265    use bytes::Bytes;
266    use futures_util::FutureExt;
267    use http::header::{HeaderValue, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE};
268
269    use super::*;
270
271    fn once_body(b: impl Into<Bytes>) -> impl Stream<Item = Result<Bytes, Infallible>> {
272        futures_util::stream::once(async { Ok(b.into()) })
273    }
274
275    #[test]
276    fn method() {
277        let req = Request::new(());
278        let body = once_body(Bytes::new());
279        let err = multipart(&req, body).err();
280        assert!(matches!(err, Some(MultipartError::NoPostMethod)));
281    }
282
283    #[test]
284    fn basic() {
285        let body = b"\
286            --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
287            Content-Disposition: form-data; name=\"file\"; filename=\"foo.txt\"\r\n\
288            Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
289            test\r\n\
290            --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
291            Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
292            Content-Type: text/plain\r\n\r\n\
293            testdata\r\n\
294            --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
295            Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
296            Content-Type: text/plain\r\n\r\n\
297            testdata\r\n\
298            --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
299            Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
300            Content-Type: text/plain\r\nContent-Length: 9\r\n\r\n\
301            testdata2\r\n\
302            --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n\
303            ";
304
305        let mut req = Request::new(());
306        *req.method_mut() = Method::POST;
307        req.headers_mut().insert(
308            CONTENT_TYPE,
309            HeaderValue::from_static("multipart/mixed; boundary=abbc761f78ff4d7cb7573b5a23f96ef0"),
310        );
311
312        let body = once_body(Bytes::copy_from_slice(body));
313
314        let multipart = multipart(&req, body).unwrap();
315
316        let mut multipart = pin!(multipart);
317
318        {
319            let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
320
321            assert_eq!(
322                field.headers().get(CONTENT_DISPOSITION).unwrap(),
323                HeaderValue::from_static("form-data; name=\"file\"; filename=\"foo.txt\"")
324            );
325            assert_eq!(field.name().unwrap(), "file");
326            assert_eq!(field.file_name().unwrap(), "foo.txt");
327            assert_eq!(
328                field.headers().get(CONTENT_TYPE).unwrap(),
329                HeaderValue::from_static("text/plain; charset=utf-8")
330            );
331            assert_eq!(
332                field.headers().get(CONTENT_LENGTH).unwrap(),
333                HeaderValue::from_static("4")
334            );
335            assert_eq!(
336                field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
337                b"test"
338            );
339            assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
340        }
341
342        {
343            let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
344
345            assert_eq!(
346                field.headers().get(CONTENT_DISPOSITION).unwrap(),
347                HeaderValue::from_static("form-data; name=\"file\"; filename=\"bar.txt\"")
348            );
349            assert_eq!(field.name().unwrap(), "file");
350            assert_eq!(field.file_name().unwrap(), "bar.txt");
351            assert_eq!(
352                field.headers().get(CONTENT_TYPE).unwrap(),
353                HeaderValue::from_static("text/plain")
354            );
355            assert!(field.headers().get(CONTENT_LENGTH).is_none());
356            assert_eq!(
357                field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
358                b"testdata"
359            );
360            assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
361        }
362
363        // test drop field without consuming.
364        multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
365
366        {
367            let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
368
369            assert_eq!(
370                field.headers().get(CONTENT_DISPOSITION).unwrap(),
371                HeaderValue::from_static("form-data; name=\"file\"; filename=\"bar.txt\"")
372            );
373            assert_eq!(field.name().unwrap(), "file");
374            assert_eq!(field.file_name().unwrap(), "bar.txt");
375            assert_eq!(
376                field.headers().get(CONTENT_TYPE).unwrap(),
377                HeaderValue::from_static("text/plain")
378            );
379            assert_eq!(
380                field.headers().get(CONTENT_LENGTH).unwrap(),
381                HeaderValue::from_static("9")
382            );
383            assert_eq!(
384                field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
385                b"testdata2"
386            );
387            assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
388        }
389
390        assert!(multipart.try_next().now_or_never().unwrap().unwrap().is_none());
391        assert!(multipart.try_next().now_or_never().unwrap().unwrap().is_none());
392    }
393
394    #[test]
395    fn field_header_overflow() {
396        let body = b"\
397            --12345\r\n\
398            Content-Disposition: form-data; name=\"file\"; filename=\"foo.txt\"\r\n\
399            Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4";
400
401        let mut req = Request::new(());
402        *req.method_mut() = Method::POST;
403        req.headers_mut().insert(
404            CONTENT_TYPE,
405            HeaderValue::from_static("multipart/mixed; boundary=12345"),
406        );
407
408        let body = once_body(Bytes::copy_from_slice(body));
409
410        // limit is set to 7 so the first boundary can be parsed.
411        let multipart = multipart_with_config(&req, body, Config { buf_limit: 7 }).unwrap();
412
413        let mut multipart = pin!(multipart);
414
415        assert!(matches!(
416            multipart.try_next().now_or_never().unwrap().err().unwrap(),
417            MultipartError::Header(httparse::Error::TooManyHeaders)
418        ));
419    }
420
421    // Regression: boundary() in header.rs computed `end` as a subslice-relative
422    // index rather than an absolute one, so &header[start..end] panicked when
423    // Content-Type contained extra parameters after the boundary value.
424    #[test]
425    fn boundary_with_trailing_content_type_param() {
426        let mut req = Request::new(());
427        *req.method_mut() = Method::POST;
428        req.headers_mut().insert(
429            CONTENT_TYPE,
430            HeaderValue::from_static("multipart/form-data; boundary=abc; charset=utf-8"),
431        );
432        let body = once_body(Bytes::new());
433        // Should extract boundary "abc" without panicking.
434        let result = multipart(&req, body);
435        assert!(result.is_ok());
436    }
437
438    #[test]
439    fn boundary_quoted() {
440        // RFC 2045 ยง5.1: boundary may be a quoted-string.
441        let mut req = Request::new(());
442        *req.method_mut() = Method::POST;
443        req.headers_mut().insert(
444            CONTENT_TYPE,
445            HeaderValue::from_static("multipart/form-data; boundary=\"abc def\""),
446        );
447        let body = once_body(Bytes::new());
448        // Should extract "abc def" (quotes stripped), not fail.
449        assert!(multipart(&req, body).is_ok());
450    }
451
452    #[test]
453    fn boundary_unquoted_whitespace() {
454        // Unquoted boundary values may carry surrounding OWS from header folding.
455        let mut req = Request::new(());
456        *req.method_mut() = Method::POST;
457        req.headers_mut().insert(
458            CONTENT_TYPE,
459            HeaderValue::from_static("multipart/form-data; boundary= abc "),
460        );
461        let body = once_body(Bytes::new());
462        // Should extract "abc" (whitespace trimmed), not fail.
463        assert!(multipart(&req, body).is_ok());
464    }
465
466    #[test]
467    fn boundary_quoted_with_leading_whitespace() {
468        // OWS before the opening quote must also be accepted.
469        let mut req = Request::new(());
470        *req.method_mut() = Method::POST;
471        req.headers_mut().insert(
472            CONTENT_TYPE,
473            HeaderValue::from_static("multipart/form-data; boundary= \"abc def\""),
474        );
475        let body = once_body(Bytes::new());
476        assert!(multipart(&req, body).is_ok());
477    }
478
479    #[test]
480    fn boundary_quoted_unclosed() {
481        // An unclosed quote is malformed and must return an error.
482        let mut req = Request::new(());
483        *req.method_mut() = Method::POST;
484        req.headers_mut().insert(
485            CONTENT_TYPE,
486            HeaderValue::from_static("multipart/form-data; boundary=\"unclosed"),
487        );
488        let body = once_body(Bytes::new());
489        assert!(matches!(multipart(&req, body), Err(MultipartError::Boundary)));
490    }
491
492    #[test]
493    fn consume_field_boundary_split_across_chunks() {
494        // Full logical body (boundary = "abc"):
495        //   --abc\r\n<headers-f1>\r\n\r\nsomedata\r\n--abc\r\n<headers-f2>\r\n\r\nhello\r\n--abc--\r\n
496        //
497        // Split just before the second '-' of "--abc":
498        let chunk1 = Bytes::from_static(b"--abc\r\nContent-Disposition: form-data; name=\"f1\"\r\n\r\nsomedata\r\n-");
499        let chunk2 =
500            Bytes::from_static(b"-abc\r\nContent-Disposition: form-data; name=\"f2\"\r\n\r\nhello\r\n--abc--\r\n");
501
502        let mut req = Request::new(());
503        *req.method_mut() = Method::POST;
504        req.headers_mut()
505            .insert(CONTENT_TYPE, HeaderValue::from_static("multipart/mixed; boundary=abc"));
506
507        let body = futures_util::stream::iter(vec![Ok::<_, Infallible>(chunk1), Ok(chunk2)]);
508        let multipart = multipart(&req, body).unwrap();
509        let mut multipart = pin!(multipart);
510
511        // Retrieve field1 then drop it without consuming any bytes.
512        // The next try_next() call will invoke consume_pending_field(), which
513        // is where the split-boundary bug manifests.
514        {
515            let _field1 = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
516        }
517
518        // field2 must still be accessible.
519        let field2 = multipart.try_next().now_or_never().unwrap().unwrap();
520        assert!(
521            field2.is_some(),
522            "field2 was skipped because '--' was split across two stream chunks"
523        );
524    }
525
526    #[test]
527    fn boundary_overflow() {
528        let body = b"--123456";
529
530        let mut req = Request::new(());
531        *req.method_mut() = Method::POST;
532        req.headers_mut().insert(
533            CONTENT_TYPE,
534            HeaderValue::from_static("multipart/mixed; boundary=12345"),
535        );
536
537        let body = once_body(Bytes::copy_from_slice(body));
538
539        // limit is set to 7 so the first boundary can not be parsed.
540        let multipart = multipart_with_config(&req, body, Config { buf_limit: 7 }).unwrap();
541
542        let mut multipart = pin!(multipart);
543
544        assert!(matches!(
545            multipart.try_next().now_or_never().unwrap().err().unwrap(),
546            MultipartError::BufferOverflow
547        ));
548    }
549}