1#![forbid(unsafe_code)]
2
3mod content_disposition;
4mod error;
5mod field;
6mod form;
7mod header;
8
9pub use self::{
10 error::MultipartError,
11 field::Field,
12 form::{Form, Part},
13};
14
15use core::{future::poll_fn, pin::Pin};
16
17use bytes::{Buf, BytesMut};
18use field::FieldDecoder;
19use futures_core::stream::Stream;
20use http::{header::HeaderMap, Method, Request};
21use memchr::memmem;
22use pin_project_lite::pin_project;
23
24use self::{content_disposition::ContentDisposition, error::PayloadError};
25
26pub fn multipart<Ext, B, T, E>(req: &Request<Ext>, body: B) -> Result<Multipart<B>, MultipartError>
64where
65 B: Stream<Item = Result<T, E>>,
66 T: AsRef<[u8]>,
67 E: Into<PayloadError>,
68{
69 multipart_with_config(req, body, Config::default())
70}
71
72pub fn multipart_with_config<Ext, B, T, E>(
74 req: &Request<Ext>,
75 body: B,
76 config: Config,
77) -> Result<Multipart<B>, MultipartError>
78where
79 B: Stream<Item = Result<T, E>>,
80 T: AsRef<[u8]>,
81 E: Into<PayloadError>,
82{
83 if req.method() != Method::POST {
84 return Err(MultipartError::NoPostMethod);
85 }
86
87 let boundary = header::boundary(req.headers())?;
88
89 Ok(Multipart {
90 stream: body,
91 buf: BytesMut::new(),
92 boundary: boundary.into(),
93 headers: HeaderMap::new(),
94 pending_field: false,
95 config,
96 })
97}
98
99#[derive(Debug, Copy, Clone)]
101pub struct Config {
102 pub buf_limit: usize,
106}
107
108impl Default for Config {
109 fn default() -> Self {
110 Self { buf_limit: 1024 * 1024 }
111 }
112}
113
114pin_project! {
115 pub struct Multipart<S> {
116 #[pin]
117 stream: S,
118 buf: BytesMut,
119 boundary: Box<[u8]>,
120 headers: HeaderMap,
121 pending_field: bool,
122 config: Config
123 }
124}
125
126const DOUBLE_HYPHEN: &[u8; 2] = b"--";
127const LF: &[u8; 1] = b"\n";
128const DOUBLE_CR_LF: &[u8; 4] = b"\r\n\r\n";
129const FIELD_DELIMITER: &[u8; 4] = b"\r\n--";
130
131impl<S, T, E> Multipart<S>
132where
133 S: Stream<Item = Result<T, E>>,
134 T: AsRef<[u8]>,
135 E: Into<PayloadError>,
136{
137 pub async fn try_next<'s>(self: &'s mut Pin<&mut Self>) -> Result<Option<Field<'s, S>>, MultipartError> {
140 let boundary_len = self.boundary.len();
141
142 if self.pending_field {
143 self.as_mut().consume_pending_field().await?;
144 }
145
146 loop {
147 let this = self.as_mut().project();
148 if let Some(idx) = memmem::find(this.buf, LF) {
149 let slice = match idx.checked_sub(1) {
151 Some(idx) => &this.buf[..idx],
152 None => return Err(MultipartError::Boundary),
154 };
155
156 match slice.len() {
157 0 => {
159 this.buf.advance(idx + 1);
161 continue;
162 }
163 len if len < (boundary_len + 2) => {}
165 _ if &slice[..2] != DOUBLE_HYPHEN => return Err(MultipartError::Boundary),
167 _ if this.boundary.as_ref().eq(&slice[2..]) => {
169 this.buf.advance(idx + 1);
171
172 let field = self.as_mut().parse_field().await?;
173 return Ok(Some(field));
174 }
175 len if len == (boundary_len + 4) => {
177 let at = boundary_len + 2;
178 let _ = this.boundary.as_ref().eq(&slice[2..at]) && &slice[at..] == DOUBLE_HYPHEN;
180 return Ok(None);
181 }
182 _ => return Err(MultipartError::Boundary),
184 }
185 }
186
187 if self.buf_overflow() {
188 return Err(MultipartError::BufferOverflow);
189 }
190
191 self.as_mut().try_read_stream_to_buf().await?;
192 }
193 }
194
195 async fn parse_field(mut self: Pin<&mut Self>) -> Result<Field<'_, S>, MultipartError> {
196 loop {
197 let this = self.as_mut().project();
198
199 if let Some(idx) = memmem::find(this.buf, DOUBLE_CR_LF) {
200 let slice = &this.buf[..idx + 4];
201
202 header::parse_headers(this.headers, slice)?;
203 this.buf.advance(slice.len());
204
205 let cp = ContentDisposition::try_from_header(this.headers)?;
206
207 header::check_headers(this.headers)?;
208
209 let length = header::content_length_opt(this.headers)?;
210
211 *this.pending_field = true;
212
213 return Ok(Field::new(length, cp, self));
214 }
215
216 if self.buf_overflow() {
217 return Err(MultipartError::Header(httparse::Error::TooManyHeaders));
218 }
219
220 self.as_mut().try_read_stream_to_buf().await?;
221 }
222 }
223
224 #[cold]
225 #[inline(never)]
226 async fn consume_pending_field(mut self: Pin<&mut Self>) -> Result<(), MultipartError> {
227 let mut field_ty = FieldDecoder::default();
228
229 loop {
230 let this = self.as_mut().project();
231 if let Some(idx) = field_ty.try_find_split_idx(this.buf, this.boundary)? {
232 this.buf.advance(idx);
233 }
234 if matches!(field_ty, FieldDecoder::StreamEnd) {
235 *this.pending_field = false;
236 return Ok(());
237 }
238 self.as_mut().try_read_stream_to_buf().await?;
239 }
240 }
241
242 async fn try_read_stream_to_buf(mut self: Pin<&mut Self>) -> Result<(), MultipartError> {
243 let bytes = self.as_mut().try_read_stream().await?;
244 self.project().buf.extend_from_slice(bytes.as_ref());
245 Ok(())
246 }
247
248 async fn try_read_stream(mut self: Pin<&mut Self>) -> Result<T, MultipartError> {
249 match poll_fn(move |cx| self.as_mut().project().stream.poll_next(cx)).await {
250 Some(Ok(bytes)) => Ok(bytes),
251 Some(Err(e)) => Err(MultipartError::Payload(e.into())),
252 None => Err(MultipartError::UnexpectedEof),
253 }
254 }
255
256 pub(crate) fn buf_overflow(&self) -> bool {
257 self.buf.len() > self.config.buf_limit
258 }
259}
260
261#[cfg(test)]
262mod test {
263 use std::{convert::Infallible, pin::pin};
264
265 use bytes::Bytes;
266 use futures_util::FutureExt;
267 use http::header::{HeaderValue, CONTENT_DISPOSITION, CONTENT_LENGTH, CONTENT_TYPE};
268
269 use super::*;
270
271 fn once_body(b: impl Into<Bytes>) -> impl Stream<Item = Result<Bytes, Infallible>> {
272 futures_util::stream::once(async { Ok(b.into()) })
273 }
274
275 #[test]
276 fn method() {
277 let req = Request::new(());
278 let body = once_body(Bytes::new());
279 let err = multipart(&req, body).err();
280 assert!(matches!(err, Some(MultipartError::NoPostMethod)));
281 }
282
283 #[test]
284 fn basic() {
285 let body = b"\
286 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
287 Content-Disposition: form-data; name=\"file\"; filename=\"foo.txt\"\r\n\
288 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
289 test\r\n\
290 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
291 Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
292 Content-Type: text/plain\r\n\r\n\
293 testdata\r\n\
294 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
295 Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
296 Content-Type: text/plain\r\n\r\n\
297 testdata\r\n\
298 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
299 Content-Disposition: form-data; name=\"file\"; filename=\"bar.txt\"\r\n\
300 Content-Type: text/plain\r\nContent-Length: 9\r\n\r\n\
301 testdata2\r\n\
302 --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n\
303 ";
304
305 let mut req = Request::new(());
306 *req.method_mut() = Method::POST;
307 req.headers_mut().insert(
308 CONTENT_TYPE,
309 HeaderValue::from_static("multipart/mixed; boundary=abbc761f78ff4d7cb7573b5a23f96ef0"),
310 );
311
312 let body = once_body(Bytes::copy_from_slice(body));
313
314 let multipart = multipart(&req, body).unwrap();
315
316 let mut multipart = pin!(multipart);
317
318 {
319 let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
320
321 assert_eq!(
322 field.headers().get(CONTENT_DISPOSITION).unwrap(),
323 HeaderValue::from_static("form-data; name=\"file\"; filename=\"foo.txt\"")
324 );
325 assert_eq!(field.name().unwrap(), "file");
326 assert_eq!(field.file_name().unwrap(), "foo.txt");
327 assert_eq!(
328 field.headers().get(CONTENT_TYPE).unwrap(),
329 HeaderValue::from_static("text/plain; charset=utf-8")
330 );
331 assert_eq!(
332 field.headers().get(CONTENT_LENGTH).unwrap(),
333 HeaderValue::from_static("4")
334 );
335 assert_eq!(
336 field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
337 b"test"
338 );
339 assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
340 }
341
342 {
343 let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
344
345 assert_eq!(
346 field.headers().get(CONTENT_DISPOSITION).unwrap(),
347 HeaderValue::from_static("form-data; name=\"file\"; filename=\"bar.txt\"")
348 );
349 assert_eq!(field.name().unwrap(), "file");
350 assert_eq!(field.file_name().unwrap(), "bar.txt");
351 assert_eq!(
352 field.headers().get(CONTENT_TYPE).unwrap(),
353 HeaderValue::from_static("text/plain")
354 );
355 assert!(field.headers().get(CONTENT_LENGTH).is_none());
356 assert_eq!(
357 field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
358 b"testdata"
359 );
360 assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
361 }
362
363 multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
365
366 {
367 let mut field = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
368
369 assert_eq!(
370 field.headers().get(CONTENT_DISPOSITION).unwrap(),
371 HeaderValue::from_static("form-data; name=\"file\"; filename=\"bar.txt\"")
372 );
373 assert_eq!(field.name().unwrap(), "file");
374 assert_eq!(field.file_name().unwrap(), "bar.txt");
375 assert_eq!(
376 field.headers().get(CONTENT_TYPE).unwrap(),
377 HeaderValue::from_static("text/plain")
378 );
379 assert_eq!(
380 field.headers().get(CONTENT_LENGTH).unwrap(),
381 HeaderValue::from_static("9")
382 );
383 assert_eq!(
384 field.try_next().now_or_never().unwrap().unwrap().unwrap().chunk(),
385 b"testdata2"
386 );
387 assert!(field.try_next().now_or_never().unwrap().unwrap().is_none());
388 }
389
390 assert!(multipart.try_next().now_or_never().unwrap().unwrap().is_none());
391 assert!(multipart.try_next().now_or_never().unwrap().unwrap().is_none());
392 }
393
394 #[test]
395 fn field_header_overflow() {
396 let body = b"\
397 --12345\r\n\
398 Content-Disposition: form-data; name=\"file\"; filename=\"foo.txt\"\r\n\
399 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4";
400
401 let mut req = Request::new(());
402 *req.method_mut() = Method::POST;
403 req.headers_mut().insert(
404 CONTENT_TYPE,
405 HeaderValue::from_static("multipart/mixed; boundary=12345"),
406 );
407
408 let body = once_body(Bytes::copy_from_slice(body));
409
410 let multipart = multipart_with_config(&req, body, Config { buf_limit: 7 }).unwrap();
412
413 let mut multipart = pin!(multipart);
414
415 assert!(matches!(
416 multipart.try_next().now_or_never().unwrap().err().unwrap(),
417 MultipartError::Header(httparse::Error::TooManyHeaders)
418 ));
419 }
420
421 #[test]
425 fn boundary_with_trailing_content_type_param() {
426 let mut req = Request::new(());
427 *req.method_mut() = Method::POST;
428 req.headers_mut().insert(
429 CONTENT_TYPE,
430 HeaderValue::from_static("multipart/form-data; boundary=abc; charset=utf-8"),
431 );
432 let body = once_body(Bytes::new());
433 let result = multipart(&req, body);
435 assert!(result.is_ok());
436 }
437
438 #[test]
439 fn boundary_quoted() {
440 let mut req = Request::new(());
442 *req.method_mut() = Method::POST;
443 req.headers_mut().insert(
444 CONTENT_TYPE,
445 HeaderValue::from_static("multipart/form-data; boundary=\"abc def\""),
446 );
447 let body = once_body(Bytes::new());
448 assert!(multipart(&req, body).is_ok());
450 }
451
452 #[test]
453 fn boundary_unquoted_whitespace() {
454 let mut req = Request::new(());
456 *req.method_mut() = Method::POST;
457 req.headers_mut().insert(
458 CONTENT_TYPE,
459 HeaderValue::from_static("multipart/form-data; boundary= abc "),
460 );
461 let body = once_body(Bytes::new());
462 assert!(multipart(&req, body).is_ok());
464 }
465
466 #[test]
467 fn boundary_quoted_with_leading_whitespace() {
468 let mut req = Request::new(());
470 *req.method_mut() = Method::POST;
471 req.headers_mut().insert(
472 CONTENT_TYPE,
473 HeaderValue::from_static("multipart/form-data; boundary= \"abc def\""),
474 );
475 let body = once_body(Bytes::new());
476 assert!(multipart(&req, body).is_ok());
477 }
478
479 #[test]
480 fn boundary_quoted_unclosed() {
481 let mut req = Request::new(());
483 *req.method_mut() = Method::POST;
484 req.headers_mut().insert(
485 CONTENT_TYPE,
486 HeaderValue::from_static("multipart/form-data; boundary=\"unclosed"),
487 );
488 let body = once_body(Bytes::new());
489 assert!(matches!(multipart(&req, body), Err(MultipartError::Boundary)));
490 }
491
492 #[test]
493 fn consume_field_boundary_split_across_chunks() {
494 let chunk1 = Bytes::from_static(b"--abc\r\nContent-Disposition: form-data; name=\"f1\"\r\n\r\nsomedata\r\n-");
499 let chunk2 =
500 Bytes::from_static(b"-abc\r\nContent-Disposition: form-data; name=\"f2\"\r\n\r\nhello\r\n--abc--\r\n");
501
502 let mut req = Request::new(());
503 *req.method_mut() = Method::POST;
504 req.headers_mut()
505 .insert(CONTENT_TYPE, HeaderValue::from_static("multipart/mixed; boundary=abc"));
506
507 let body = futures_util::stream::iter(vec![Ok::<_, Infallible>(chunk1), Ok(chunk2)]);
508 let multipart = multipart(&req, body).unwrap();
509 let mut multipart = pin!(multipart);
510
511 {
515 let _field1 = multipart.try_next().now_or_never().unwrap().unwrap().unwrap();
516 }
517
518 let field2 = multipart.try_next().now_or_never().unwrap().unwrap();
520 assert!(
521 field2.is_some(),
522 "field2 was skipped because '--' was split across two stream chunks"
523 );
524 }
525
526 #[test]
527 fn boundary_overflow() {
528 let body = b"--123456";
529
530 let mut req = Request::new(());
531 *req.method_mut() = Method::POST;
532 req.headers_mut().insert(
533 CONTENT_TYPE,
534 HeaderValue::from_static("multipart/mixed; boundary=12345"),
535 );
536
537 let body = once_body(Bytes::copy_from_slice(body));
538
539 let multipart = multipart_with_config(&req, body, Config { buf_limit: 7 }).unwrap();
541
542 let mut multipart = pin!(multipart);
543
544 assert!(matches!(
545 multipart.try_next().now_or_never().unwrap().err().unwrap(),
546 MultipartError::BufferOverflow
547 ));
548 }
549}