1use std::{
4 cell::RefCell,
5 pin::Pin,
6 rc::Rc,
7 task::{Context, Poll},
8};
9
10use actix_web::{
11 dev,
12 error::{ParseError, PayloadError},
13 http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
14 web::{self, Bytes},
15 HttpRequest,
16};
17use futures_core::stream::Stream;
18use mime::Mime;
19
20use crate::{
21 error::Error,
22 field::InnerField,
23 payload::{PayloadBuffer, PayloadRef, DEFAULT_BUFFER_LIMIT},
24 safety::Safety,
25 Field,
26};
27
28const MAX_HEADERS: usize = 32;
29
30pub struct Multipart {
36 flow: Flow,
37 safety: Safety,
38}
39
40enum Flow {
41 InFlight(Inner),
42
43 Error(Option<Error>),
45}
46
47#[derive(Clone, Debug)]
51#[non_exhaustive]
52pub struct MultipartConfig {
53 buffer_limit: usize,
54}
55
56impl MultipartConfig {
57 pub fn new() -> Self {
59 DEFAULT_CONFIG
60 }
61
62 pub fn buffer_limit(mut self, buffer_limit: usize) -> Self {
64 self.buffer_limit = buffer_limit;
65 self
66 }
67
68 fn from_req(req: &HttpRequest) -> &Self {
71 req.app_data::<Self>()
72 .or_else(|| req.app_data::<web::Data<Self>>().map(|d| d.as_ref()))
73 .unwrap_or(&DEFAULT_CONFIG)
74 }
75}
76
77const DEFAULT_CONFIG: MultipartConfig = MultipartConfig {
78 buffer_limit: DEFAULT_BUFFER_LIMIT,
79};
80
81impl Default for MultipartConfig {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl Multipart {
88 pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
90 where
91 S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
92 {
93 match Self::find_ct_and_boundary(headers) {
94 Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, stream),
95 Err(err) => Self::from_error(err),
96 }
97 }
98
99 pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
101 let config = MultipartConfig::from_req(req);
102
103 match Self::find_ct_and_boundary(req.headers()) {
104 Ok((ct, boundary)) => Self::from_ct_and_boundary_with_buffer_limit(
105 ct,
106 boundary,
107 payload.take(),
108 config.buffer_limit,
109 ),
110 Err(err) => Self::from_error(err),
111 }
112 }
113
114 pub(crate) fn find_ct_and_boundary(headers: &HeaderMap) -> Result<(Mime, String), Error> {
116 let content_type = headers
117 .get(&header::CONTENT_TYPE)
118 .ok_or(Error::ContentTypeMissing)?
119 .to_str()
120 .ok()
121 .and_then(|content_type| content_type.parse::<Mime>().ok())
122 .ok_or(Error::ContentTypeParse)?;
123
124 if content_type.type_() != mime::MULTIPART {
125 return Err(Error::ContentTypeIncompatible);
126 }
127
128 let boundary = content_type
129 .get_param(mime::BOUNDARY)
130 .ok_or(Error::BoundaryMissing)?
131 .as_str()
132 .to_owned();
133
134 if boundary.is_empty() {
135 return Err(Error::BoundaryMissing);
136 }
137
138 Ok((content_type, boundary))
139 }
140
141 pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
143 where
144 S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
145 {
146 Self::from_ct_and_boundary_with_buffer_limit(
147 ct,
148 boundary,
149 stream,
150 DEFAULT_CONFIG.buffer_limit,
151 )
152 }
153
154 fn from_ct_and_boundary_with_buffer_limit<S>(
155 ct: Mime,
156 boundary: String,
157 stream: S,
158 buffer_limit: usize,
159 ) -> Multipart
160 where
161 S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
162 {
163 Multipart {
164 safety: Safety::new(),
165 flow: Flow::InFlight(Inner {
166 payload: PayloadRef::new(PayloadBuffer::new_with_limit(stream, buffer_limit)),
167 content_type: ct,
168 boundary,
169 state: State::FirstBoundary,
170 item: Item::None,
171 }),
172 }
173 }
174
175 pub(crate) fn from_error(err: Error) -> Multipart {
177 Multipart {
178 flow: Flow::Error(Some(err)),
179 safety: Safety::new(),
180 }
181 }
182
183 pub(crate) fn content_type_or_bail(&mut self) -> Result<mime::Mime, Error> {
185 match self.flow {
186 Flow::InFlight(ref inner) => Ok(inner.content_type.clone()),
187 Flow::Error(ref mut err) => Err(err
188 .take()
189 .expect("error should not be taken after it was returned")),
190 }
191 }
192}
193
194impl Stream for Multipart {
195 type Item = Result<Field, Error>;
196
197 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198 let this = self.get_mut();
199
200 match this.flow {
201 Flow::InFlight(ref mut inner) => {
202 if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
203 buffer.poll_stream(cx)?;
205 } else if !this.safety.is_clean() {
206 return Poll::Ready(Some(Err(Error::NotConsumed)));
208 } else {
209 return Poll::Pending;
210 }
211
212 inner.poll(&this.safety, cx)
213 }
214
215 Flow::Error(ref mut err) => Poll::Ready(Some(Err(err
216 .take()
217 .expect("Multipart polled after finish")))),
218 }
219 }
220}
221
222#[derive(PartialEq, Debug)]
223enum State {
224 FirstBoundary,
226
227 Boundary,
229
230 Headers,
232
233 Eof,
235}
236
237enum Item {
238 None,
239 Field(Rc<RefCell<InnerField>>),
240}
241
242struct Inner {
243 payload: PayloadRef,
245
246 content_type: Mime,
250
251 boundary: String,
253
254 state: State,
255 item: Item,
256}
257
258impl Inner {
259 fn read_field_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, Error> {
260 match payload.read_until(b"\r\n\r\n")? {
261 None => {
262 if payload.eof {
263 Err(Error::Incomplete)
264 } else {
265 Ok(None)
266 }
267 }
268
269 Some(bytes) => {
270 let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
271
272 match httparse::parse_headers(&bytes, &mut hdrs).map_err(ParseError::from)? {
273 httparse::Status::Complete((_, hdrs)) => {
274 let mut headers = HeaderMap::with_capacity(hdrs.len());
276
277 for h in hdrs {
278 let name =
279 HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
280 let value =
281 HeaderValue::try_from(h.value).map_err(|_| ParseError::Header)?;
282 headers.append(name, value);
283 }
284
285 Ok(Some(headers))
286 }
287
288 httparse::Status::Partial => Err(ParseError::Header.into()),
289 }
290 }
291 }
292 }
293
294 fn read_boundary(payload: &mut PayloadBuffer, boundary: &str) -> Result<Option<bool>, Error> {
310 if boundary.is_empty() {
311 return Err(Error::BoundaryMissing);
312 }
313
314 let chunk = match payload.readline_or_eof()? {
316 None => return Ok(payload.eof.then_some(true)),
318 Some(chunk) => chunk,
319 };
320
321 const BOUNDARY_MARKER: &[u8] = b"--";
322 const LINE_BREAK: &[u8] = b"\r\n";
323
324 let Some(chunk) = chunk.as_ref().strip_prefix(BOUNDARY_MARKER) else {
325 return Err(Error::BoundaryMissing);
326 };
327
328 let Some(chunk) = chunk.strip_prefix(boundary.as_bytes()) else {
329 return Err(Error::BoundaryMissing);
330 };
331
332 if chunk == LINE_BREAK {
333 return Ok(Some(false));
335 }
336
337 if chunk == BOUNDARY_MARKER || chunk == b"--\r\n" {
339 return Ok(Some(true));
340 }
341
342 Err(Error::BoundaryMissing)
343 }
344
345 fn skip_until_boundary(
346 payload: &mut PayloadBuffer,
347 boundary: &str,
348 ) -> Result<Option<bool>, Error> {
349 if boundary.is_empty() {
350 return Err(Error::BoundaryMissing);
351 }
352
353 let mut eof = false;
354 let boundary = boundary.as_bytes();
355
356 loop {
357 match payload.readline()? {
358 Some(chunk) => {
359 if chunk.is_empty() {
360 return Err(Error::BoundaryMissing);
361 }
362
363 let Some(line) = chunk.as_ref().strip_suffix(b"\r\n") else {
364 continue;
365 };
366
367 if let Some(line) = line.strip_prefix(b"--") {
368 if line == boundary {
369 break;
370 }
371
372 if line.strip_suffix(b"--") == Some(boundary) {
373 eof = true;
374 break;
375 }
376 }
377 }
378 None => {
379 return if payload.eof {
380 Err(Error::Incomplete)
381 } else {
382 Ok(None)
383 };
384 }
385 }
386 }
387 Ok(Some(eof))
388 }
389
390 fn poll(&mut self, safety: &Safety, cx: &Context<'_>) -> Poll<Option<Result<Field, Error>>> {
391 if self.state == State::Eof {
392 Poll::Ready(None)
393 } else {
394 loop {
396 if safety.current() {
399 let stop = match self.item {
400 Item::Field(ref mut field) => match field.borrow_mut().poll(safety) {
401 Poll::Pending => return Poll::Pending,
402 Poll::Ready(Some(Ok(_))) => continue,
403 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
404 Poll::Ready(None) => true,
405 },
406 Item::None => false,
407 };
408 if stop {
409 self.item = Item::None;
410 }
411 if let Item::None = self.item {
412 break;
413 }
414 }
415 }
416
417 let field_headers = if let Some(mut payload) = self.payload.get_mut(safety) {
418 match self.state {
419 State::FirstBoundary => {
421 match Inner::skip_until_boundary(&mut payload, &self.boundary)? {
422 None => return Poll::Pending,
423 Some(eof) => {
424 if eof {
425 self.state = State::Eof;
426 return Poll::Ready(None);
427 } else {
428 self.state = State::Headers;
429 }
430 }
431 }
432 }
433
434 State::Boundary => match Inner::read_boundary(&mut payload, &self.boundary)? {
436 None => return Poll::Pending,
437 Some(eof) => {
438 if eof {
439 self.state = State::Eof;
440 return Poll::Ready(None);
441 } else {
442 self.state = State::Headers;
443 }
444 }
445 },
446
447 _ => {}
448 }
449
450 if self.state == State::Headers {
452 if let Some(headers) = Inner::read_field_headers(&mut payload)? {
453 self.state = State::Boundary;
454 headers
455 } else {
456 return Poll::Pending;
457 }
458 } else {
459 unreachable!()
460 }
461 } else {
462 log::debug!("NotReady: field is in flight");
463 return Poll::Pending;
464 };
465
466 let field_content_disposition = field_headers
467 .get(&header::CONTENT_DISPOSITION)
468 .and_then(|cd| ContentDisposition::from_raw(cd).ok())
469 .filter(|content_disposition| {
470 matches!(
471 content_disposition.disposition,
472 header::DispositionType::FormData,
473 )
474 });
475
476 let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
477 let Some(cd) = &field_content_disposition else {
482 return Poll::Ready(Some(Err(Error::ContentDispositionMissing)));
483 };
484
485 let Some(field_name) = cd.get_name() else {
486 return Poll::Ready(Some(Err(Error::ContentDispositionNameMissing)));
487 };
488
489 Some(field_name.to_owned())
490 } else {
491 None
492 };
493
494 let field_content_type: Option<Mime> = field_headers
497 .get(&header::CONTENT_TYPE)
498 .and_then(|ct| ct.to_str().ok())
499 .and_then(|ct| ct.parse().ok());
500
501 self.state = State::Boundary;
502
503 if let Some(mime) = &field_content_type {
505 if mime.type_() == mime::MULTIPART {
506 return Poll::Ready(Some(Err(Error::Nested)));
507 }
508 }
509
510 let field_inner =
511 InnerField::new_in_rc(self.payload.clone(), self.boundary.clone(), &field_headers)?;
512
513 self.item = Item::Field(Rc::clone(&field_inner));
514
515 Poll::Ready(Some(Ok(Field::new(
516 field_content_type,
517 field_content_disposition,
518 form_field_name,
519 field_headers,
520 safety.clone(cx),
521 field_inner,
522 ))))
523 }
524 }
525}
526
527impl Drop for Inner {
528 fn drop(&mut self) {
529 self.item = Item::None;
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use std::time::Duration;
537
538 use actix_http::h1;
539 use actix_web::{
540 http::header::{DispositionParam, DispositionType},
541 rt,
542 test::TestRequest,
543 web::{BufMut as _, BytesMut},
544 FromRequest,
545 };
546 use assert_matches::assert_matches;
547 use futures_test::stream::StreamTestExt as _;
548 use futures_util::{stream, StreamExt as _};
549 use tokio::sync::mpsc;
550 use tokio_stream::wrappers::UnboundedReceiverStream;
551
552 use super::*;
553
554 const BOUNDARY: &str = "abbc761f78ff4d7cb7573b5a23f96ef0";
555
556 #[actix_rt::test]
557 async fn test_boundary() {
558 let headers = HeaderMap::new();
559 match Multipart::find_ct_and_boundary(&headers) {
560 Err(Error::ContentTypeMissing) => {}
561 _ => unreachable!("should not happen"),
562 }
563
564 let mut headers = HeaderMap::new();
565 headers.insert(
566 header::CONTENT_TYPE,
567 header::HeaderValue::from_static("test"),
568 );
569
570 match Multipart::find_ct_and_boundary(&headers) {
571 Err(Error::ContentTypeParse) => {}
572 _ => unreachable!("should not happen"),
573 }
574
575 let mut headers = HeaderMap::new();
576 headers.insert(
577 header::CONTENT_TYPE,
578 header::HeaderValue::from_static("multipart/mixed"),
579 );
580 match Multipart::find_ct_and_boundary(&headers) {
581 Err(Error::BoundaryMissing) => {}
582 _ => unreachable!("should not happen"),
583 }
584
585 let mut headers = HeaderMap::new();
586 headers.insert(
587 header::CONTENT_TYPE,
588 header::HeaderValue::from_static(
589 "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
590 ),
591 );
592
593 assert_eq!(
594 Multipart::find_ct_and_boundary(&headers).unwrap().1,
595 "5c02368e880e436dab70ed54e1c58209",
596 );
597 }
598
599 fn create_stream() -> (
600 mpsc::UnboundedSender<Result<Bytes, PayloadError>>,
601 impl Stream<Item = Result<Bytes, PayloadError>>,
602 ) {
603 let (tx, rx) = mpsc::unbounded_channel();
604
605 (
606 tx,
607 UnboundedReceiverStream::new(rx).map(|res| res.map_err(|_| panic!())),
608 )
609 }
610
611 fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
612 let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary(
613 BOUNDARY,
614 "file",
615 Some("fn.txt".to_owned()),
616 Some(mime::TEXT_PLAIN_UTF_8),
617 Bytes::from_static(b"data"),
618 );
619
620 let mut buf = BytesMut::with_capacity(body.len() + 14);
621
622 buf.put("testasdadsad\r\n".as_bytes());
624
625 buf.put(body);
626
627 (buf.freeze(), headers)
628 }
629
630 fn create_double_request_with_header() -> (Bytes, HeaderMap) {
632 let bytes = Bytes::from(
633 "testasdadsad\r\n\
634 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
635 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
636 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
637 test\r\n\
638 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
639 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
640 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
641 data\r\n\
642 --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
643 );
644 let mut headers = HeaderMap::new();
645 headers.insert(
646 header::CONTENT_TYPE,
647 header::HeaderValue::from_static(
648 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
649 ),
650 );
651 (bytes, headers)
652 }
653
654 fn create_header(content_type: &'static str) -> HeaderMap {
655 let mut headers = HeaderMap::new();
656 headers.insert(
657 header::CONTENT_TYPE,
658 header::HeaderValue::from_static(content_type),
659 );
660 headers
661 }
662
663 fn create_multipart_with_buffer_limit(
664 body: impl Stream<Item = Result<Bytes, PayloadError>> + 'static,
665 buffer_limit: usize,
666 ) -> Multipart {
667 Multipart::from_ct_and_boundary_with_buffer_limit(
668 "multipart/mixed; boundary=\"a\"".parse().unwrap(),
669 "a".to_owned(),
670 body,
671 buffer_limit,
672 )
673 }
674
675 #[actix_rt::test]
676 async fn empty_boundary_does_not_panic() {
677 let payload = stream::once(async { Ok(Bytes::from_static(b"\n")) });
678 let ct = "multipart/mixed; boundary=\"a\"".parse().unwrap();
679
680 let mut multipart = Multipart::from_ct_and_boundary(ct, String::new(), payload);
681 let res = multipart.next().await.unwrap();
682 assert_matches!(res, Err(Error::BoundaryMissing));
683 }
684
685 #[actix_rt::test]
686 async fn short_line_with_one_byte_boundary_does_not_panic() {
687 let bytes = Bytes::from_static(b"\n");
688 let mut headers = HeaderMap::new();
689 headers.insert(
690 header::CONTENT_TYPE,
691 header::HeaderValue::from_static("multipart/mixed; boundary=\"a\""),
692 );
693 let payload = stream::once(async { Ok(bytes) });
694
695 let mut multipart = Multipart::new(&headers, payload);
696 let res = multipart.next().await.unwrap();
697 assert_matches!(res, Err(Error::Incomplete));
698 }
699
700 #[actix_rt::test]
701 async fn short_final_boundary_with_one_byte_boundary_does_not_panic() {
702 let bytes = Bytes::from_static(b"--\n");
703 let mut headers = HeaderMap::new();
704 headers.insert(
705 header::CONTENT_TYPE,
706 header::HeaderValue::from_static("multipart/mixed; boundary=\"a\""),
707 );
708 let payload = stream::once(async { Ok(bytes) });
709
710 let mut multipart = Multipart::new(&headers, payload);
711 let res = multipart.next().await.unwrap();
712 assert_matches!(res, Err(Error::Incomplete));
713 }
714
715 #[actix_rt::test]
716 async fn one_byte_boundary_parses_valid_body() {
717 let bytes = Bytes::from_static(
718 b"preamble\r\n\
719 --a\r\n\
720 Content-Type: text/plain\r\n\
721 Content-Length: 3\r\n\
722 \r\n\
723 one\r\n\
724 --a\r\n\
725 Content-Type: text/plain\r\n\
726 Content-Length: 3\r\n\
727 \r\n\
728 two\r\n\
729 --a--\r\n",
730 );
731 let headers = create_header("multipart/mixed; boundary=\"a\"");
732 let payload = stream::once(async { Ok(bytes) });
733
734 let mut multipart = Multipart::new(&headers, payload);
735
736 let mut field = multipart.next().await.unwrap().unwrap();
737 assert_eq!(get_whole_field(&mut field).await, "one");
738 drop(field);
739
740 let mut field = multipart.next().await.unwrap().unwrap();
741 assert_eq!(get_whole_field(&mut field).await, "two");
742 drop(field);
743
744 assert!(multipart.next().await.is_none());
745 }
746
747 #[actix_rt::test]
748 async fn one_byte_boundary_parses_when_split_across_chunks() {
749 let bytes = Bytes::from_static(
750 b"x\r\n\
751 --a\r\n\
752 Content-Type: text/plain\r\n\
753 Content-Length: 4\r\n\
754 \r\n\
755 data\r\n\
756 --a--\r\n",
757 );
758 let headers = create_header("multipart/mixed; boundary=\"a\"");
759 let payload = stream::iter(bytes)
760 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
761 .interleave_pending();
762
763 let mut multipart = Multipart::new(&headers, payload);
764
765 let mut field = multipart.next().await.unwrap().unwrap();
766 assert_eq!(get_whole_field(&mut field).await, "data");
767 drop(field);
768
769 assert!(multipart.next().await.is_none());
770 }
771
772 #[actix_rt::test]
773 async fn short_preamble_lines_before_boundary_are_skipped() {
774 let bytes = Bytes::from_static(
775 b"\n\
776 -\r\n\
777 --a\r\n\
778 Content-Type: text/plain\r\n\
779 Content-Length: 4\r\n\
780 \r\n\
781 data\r\n\
782 --a--\r\n",
783 );
784 let headers = create_header("multipart/mixed; boundary=\"a\"");
785 let payload = stream::once(async { Ok(bytes) });
786
787 let mut multipart = Multipart::new(&headers, payload);
788
789 let mut field = multipart.next().await.unwrap().unwrap();
790 assert_eq!(get_whole_field(&mut field).await, "data");
791 drop(field);
792
793 assert!(multipart.next().await.is_none());
794 }
795
796 #[actix_rt::test]
797 async fn first_boundary_can_be_final() {
798 let bytes = Bytes::from_static(b"--a--\r\n");
799 let headers = create_header("multipart/mixed; boundary=\"a\"");
800 let payload = stream::once(async { Ok(bytes) });
801
802 let mut multipart = Multipart::new(&headers, payload);
803 assert!(multipart.next().await.is_none());
804 }
805
806 #[actix_rt::test]
807 async fn malformed_preamble_over_buffer_limit_errors() {
808 let body = stream::iter(
809 [b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"].map(|chunk| Ok(Bytes::from_static(chunk))),
810 );
811
812 let mut multipart = create_multipart_with_buffer_limit(body, 16);
813 let res = multipart.next().await.unwrap();
814
815 assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
816 }
817
818 #[actix_rt::test]
819 async fn malformed_headers_over_buffer_limit_errors() {
820 let body = stream::iter(
821 [
822 Bytes::from_static(b"--a\r\n"),
823 Bytes::from_static(b"X-Long: 12345678"),
824 Bytes::from_static(b"9012345678901234"),
825 Bytes::from_static(b"5678901234567890"),
826 ]
827 .map(Ok),
828 );
829
830 let mut multipart = create_multipart_with_buffer_limit(body, 24);
831 let res = multipart.next().await.unwrap();
832
833 assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
834 }
835
836 #[actix_rt::test]
837 async fn raw_extractor_uses_configured_buffer_limit() {
838 let (req, mut payload) = TestRequest::default()
839 .insert_header((header::CONTENT_TYPE, "multipart/mixed; boundary=\"a\""))
840 .app_data(MultipartConfig::default().buffer_limit(16))
841 .set_payload(Bytes::from_static(b"aaaaaaaabbbbbbbbcccccccc"))
842 .to_http_parts();
843
844 let mut multipart = Multipart::from_request(&req, &mut payload).await.unwrap();
845 let res = multipart.next().await.unwrap();
846
847 assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
848 }
849
850 #[actix_rt::test]
851 async fn valid_large_field_streams_through_small_parser_buffer() {
852 let mut bytes = BytesMut::new();
853 bytes.put(&b"--a\r\nContent-Length: 100\r\n\r\n"[..]);
854 bytes.put(&[b'x'; 100][..]);
855 bytes.put(&b"\r\n--a--\r\n"[..]);
856 let body = stream::once(async { Ok(bytes.freeze()) });
857
858 let mut multipart = create_multipart_with_buffer_limit(body, 32);
859 let mut field = multipart.next().await.unwrap().unwrap();
860
861 assert_eq!(
862 get_whole_field(&mut field).await,
863 Bytes::from(vec![b'x'; 100])
864 );
865 drop(field);
866 assert!(multipart.next().await.is_none());
867 }
868
869 #[actix_rt::test]
870 async fn test_multipart_no_end_crlf() {
871 let (sender, payload) = create_stream();
872 let (mut bytes, headers) = create_double_request_with_header();
873 let bytes_stripped = bytes.split_to(bytes.len() - 2); sender.send(Ok(bytes_stripped)).unwrap();
876 drop(sender); let mut multipart = Multipart::new(&headers, payload);
879
880 match multipart.next().await.unwrap() {
881 Ok(_) => {}
882 _ => unreachable!(),
883 }
884
885 match multipart.next().await.unwrap() {
886 Ok(_) => {}
887 _ => unreachable!(),
888 }
889
890 match multipart.next().await {
891 None => {}
892 _ => unreachable!(),
893 }
894 }
895
896 #[actix_rt::test]
897 async fn test_multipart() {
898 let (sender, payload) = create_stream();
899 let (bytes, headers) = create_double_request_with_header();
900
901 sender.send(Ok(bytes)).unwrap();
902
903 let mut multipart = Multipart::new(&headers, payload);
904 match multipart.next().await {
905 Some(Ok(mut field)) => {
906 let cd = field.content_disposition().unwrap();
907 assert_eq!(cd.disposition, DispositionType::FormData);
908 assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
909
910 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
911 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
912
913 match field.next().await.unwrap() {
914 Ok(chunk) => assert_eq!(chunk, "test"),
915 _ => unreachable!(),
916 }
917 match field.next().await {
918 None => {}
919 _ => unreachable!(),
920 }
921 }
922 _ => unreachable!(),
923 }
924
925 match multipart.next().await.unwrap() {
926 Ok(mut field) => {
927 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
928 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
929
930 match field.next().await {
931 Some(Ok(chunk)) => assert_eq!(chunk, "data"),
932 _ => unreachable!(),
933 }
934 match field.next().await {
935 None => {}
936 _ => unreachable!(),
937 }
938 }
939 _ => unreachable!(),
940 }
941
942 match multipart.next().await {
943 None => {}
944 _ => unreachable!(),
945 }
946 }
947
948 async fn get_whole_field(field: &mut Field) -> BytesMut {
950 let mut b = BytesMut::new();
951 loop {
952 match field.next().await {
953 Some(Ok(chunk)) => b.extend_from_slice(&chunk),
954 None => return b,
955 _ => unreachable!(),
956 }
957 }
958 }
959
960 #[actix_rt::test]
961 async fn test_stream() {
962 let (bytes, headers) = create_double_request_with_header();
963 let payload = stream::iter(bytes)
964 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
965 .interleave_pending();
966
967 let mut multipart = Multipart::new(&headers, payload);
968 match multipart.next().await.unwrap() {
969 Ok(mut field) => {
970 let cd = field.content_disposition().unwrap();
971 assert_eq!(cd.disposition, DispositionType::FormData);
972 assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
973
974 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
975 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
976
977 assert_eq!(get_whole_field(&mut field).await, "test");
978 }
979 _ => unreachable!(),
980 }
981
982 match multipart.next().await {
983 Some(Ok(mut field)) => {
984 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
985 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
986
987 assert_eq!(get_whole_field(&mut field).await, "data");
988 }
989 _ => unreachable!(),
990 }
991
992 match multipart.next().await {
993 None => {}
994 _ => unreachable!(),
995 }
996 }
997
998 #[actix_rt::test]
999 async fn test_multipart_from_error() {
1000 let err = Error::ContentTypeMissing;
1001 let mut multipart = Multipart::from_error(err);
1002 assert!(multipart.next().await.unwrap().is_err())
1003 }
1004
1005 #[actix_rt::test]
1006 async fn test_multipart_from_boundary() {
1007 let (_, payload) = create_stream();
1008 let (_, headers) = create_simple_request_with_header();
1009 let (ct, boundary) = Multipart::find_ct_and_boundary(&headers).unwrap();
1010 let _ = Multipart::from_ct_and_boundary(ct, boundary, payload);
1011 }
1012
1013 #[actix_rt::test]
1014 async fn test_multipart_payload_consumption() {
1015 let (_sender, inner_payload) = h1::Payload::create(false);
1017 let mut payload = actix_web::dev::Payload::from(inner_payload);
1018 let req = TestRequest::default().to_http_request();
1019
1020 let mut mp = Multipart::from_request(&req, &mut payload).await.unwrap();
1022 assert!(mp.next().await.unwrap().is_err());
1023
1024 match payload {
1026 actix_web::dev::Payload::H1 { .. } => {} _ => unreachable!(),
1028 }
1029 }
1030
1031 #[actix_rt::test]
1032 async fn no_content_disposition_form_data() {
1033 let bytes = Bytes::from(
1034 "testasdadsad\r\n\
1035 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1036 Content-Type: text/plain; charset=utf-8\r\n\
1037 Content-Length: 4\r\n\
1038 \r\n\
1039 test\r\n\
1040 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1041 );
1042 let mut headers = HeaderMap::new();
1043 headers.insert(
1044 header::CONTENT_TYPE,
1045 header::HeaderValue::from_static(
1046 "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1047 ),
1048 );
1049 let payload = stream::iter(bytes)
1050 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1051 .interleave_pending();
1052
1053 let mut multipart = Multipart::new(&headers, payload);
1054 let res = multipart.next().await.unwrap();
1055 assert_matches!(
1056 res.expect_err(
1057 "according to RFC 7578, form-data fields require a content-disposition header"
1058 ),
1059 Error::ContentDispositionMissing
1060 );
1061 }
1062
1063 #[actix_rt::test]
1064 async fn no_content_disposition_non_form_data() {
1065 let bytes = Bytes::from(
1066 "testasdadsad\r\n\
1067 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1068 Content-Type: text/plain; charset=utf-8\r\n\
1069 Content-Length: 4\r\n\
1070 \r\n\
1071 test\r\n\
1072 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1073 );
1074 let mut headers = HeaderMap::new();
1075 headers.insert(
1076 header::CONTENT_TYPE,
1077 header::HeaderValue::from_static(
1078 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1079 ),
1080 );
1081 let payload = stream::iter(bytes)
1082 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1083 .interleave_pending();
1084
1085 let mut multipart = Multipart::new(&headers, payload);
1086 let res = multipart.next().await.unwrap();
1087 res.unwrap();
1088 }
1089
1090 #[actix_rt::test]
1091 async fn no_name_in_form_data_content_disposition() {
1092 let bytes = Bytes::from(
1093 "testasdadsad\r\n\
1094 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1095 Content-Disposition: form-data; filename=\"fn.txt\"\r\n\
1096 Content-Type: text/plain; charset=utf-8\r\n\
1097 Content-Length: 4\r\n\
1098 \r\n\
1099 test\r\n\
1100 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1101 );
1102 let mut headers = HeaderMap::new();
1103 headers.insert(
1104 header::CONTENT_TYPE,
1105 header::HeaderValue::from_static(
1106 "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1107 ),
1108 );
1109 let payload = stream::iter(bytes)
1110 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1111 .interleave_pending();
1112
1113 let mut multipart = Multipart::new(&headers, payload);
1114 let res = multipart.next().await.unwrap();
1115 assert_matches!(
1116 res.expect_err("according to RFC 7578, form-data fields require a name attribute"),
1117 Error::ContentDispositionNameMissing
1118 );
1119 }
1120
1121 #[actix_rt::test]
1122 async fn test_drop_multipart_dont_hang() {
1123 let (sender, payload) = create_stream();
1124 let (bytes, headers) = create_simple_request_with_header();
1125 sender.send(Ok(bytes)).unwrap();
1126 drop(sender); let mut multipart = Multipart::new(&headers, payload);
1129 let mut field = multipart.next().await.unwrap().unwrap();
1130
1131 drop(multipart);
1132
1133 match field.next().await {
1135 Some(Err(Error::NotConsumed)) => {}
1136 _ => panic!(),
1137 };
1138 }
1139
1140 #[actix_rt::test]
1141 async fn test_drop_field_awaken_multipart() {
1142 let (sender, payload) = create_stream();
1143 let (bytes, headers) = create_double_request_with_header();
1144 sender.send(Ok(bytes)).unwrap();
1145 drop(sender); let mut multipart = Multipart::new(&headers, payload);
1148 let mut field = multipart.next().await.unwrap().unwrap();
1149
1150 let task = rt::spawn(async move {
1151 rt::time::sleep(Duration::from_millis(500)).await;
1152 assert_eq!(field.next().await.unwrap().unwrap(), "test");
1153 drop(field);
1154 });
1155
1156 let _ = multipart.next().await.unwrap().unwrap();
1158 task.await.unwrap();
1159 }
1160}