Skip to main content

actix_multipart/
multipart.rs

1//! Multipart response payload support.
2
3use std::{
4    cell::RefCell,
5    pin::Pin,
6    rc::Rc,
7    task::{Context, Poll},
8};
9
10use actix_web::{
11    dev,
12    error::{ParseError, PayloadError},
13    http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
14    web::{self, Bytes},
15    HttpRequest,
16};
17use futures_core::stream::Stream;
18use mime::Mime;
19
20use crate::{
21    error::Error,
22    field::InnerField,
23    payload::{PayloadBuffer, PayloadRef, DEFAULT_BUFFER_LIMIT},
24    safety::Safety,
25    Field,
26};
27
28const MAX_HEADERS: usize = 32;
29
30/// The server-side implementation of `multipart/form-data` requests.
31///
32/// This will parse the incoming stream into `MultipartItem` instances via its `Stream`
33/// implementation. `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart` is
34/// used for nested multipart streams.
35pub struct Multipart {
36    flow: Flow,
37    safety: Safety,
38}
39
40enum Flow {
41    InFlight(Inner),
42
43    /// Error container is Some until an error is returned out of the flow.
44    Error(Option<Error>),
45}
46
47/// [`Multipart`] extractor configuration.
48///
49/// Add to your app data to have it picked up by [`Multipart`] extractors.
50#[derive(Clone, Debug)]
51#[non_exhaustive]
52pub struct MultipartConfig {
53    buffer_limit: usize,
54}
55
56impl MultipartConfig {
57    /// Creates a default multipart extractor configuration.
58    pub fn new() -> Self {
59        DEFAULT_CONFIG
60    }
61
62    /// Sets maximum internal parser buffer size. By default this limit is 64 KiB.
63    pub fn buffer_limit(mut self, buffer_limit: usize) -> Self {
64        self.buffer_limit = buffer_limit;
65        self
66    }
67
68    /// Extracts multipart config from app data. Check both `T` and `Data<T>`, in that order, and
69    /// fall back to the default multipart config.
70    fn from_req(req: &HttpRequest) -> &Self {
71        req.app_data::<Self>()
72            .or_else(|| req.app_data::<web::Data<Self>>().map(|d| d.as_ref()))
73            .unwrap_or(&DEFAULT_CONFIG)
74    }
75}
76
77const DEFAULT_CONFIG: MultipartConfig = MultipartConfig {
78    buffer_limit: DEFAULT_BUFFER_LIMIT,
79};
80
81impl Default for MultipartConfig {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl Multipart {
88    /// Creates multipart instance from parts.
89    pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
90    where
91        S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
92    {
93        match Self::find_ct_and_boundary(headers) {
94            Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, stream),
95            Err(err) => Self::from_error(err),
96        }
97    }
98
99    /// Creates multipart instance from parts.
100    pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
101        let config = MultipartConfig::from_req(req);
102
103        match Self::find_ct_and_boundary(req.headers()) {
104            Ok((ct, boundary)) => Self::from_ct_and_boundary_with_buffer_limit(
105                ct,
106                boundary,
107                payload.take(),
108                config.buffer_limit,
109            ),
110            Err(err) => Self::from_error(err),
111        }
112    }
113
114    /// Extract Content-Type and boundary info from headers.
115    pub(crate) fn find_ct_and_boundary(headers: &HeaderMap) -> Result<(Mime, String), Error> {
116        let content_type = headers
117            .get(&header::CONTENT_TYPE)
118            .ok_or(Error::ContentTypeMissing)?
119            .to_str()
120            .ok()
121            .and_then(|content_type| content_type.parse::<Mime>().ok())
122            .ok_or(Error::ContentTypeParse)?;
123
124        if content_type.type_() != mime::MULTIPART {
125            return Err(Error::ContentTypeIncompatible);
126        }
127
128        let boundary = content_type
129            .get_param(mime::BOUNDARY)
130            .ok_or(Error::BoundaryMissing)?
131            .as_str()
132            .to_owned();
133
134        if boundary.is_empty() {
135            return Err(Error::BoundaryMissing);
136        }
137
138        Ok((content_type, boundary))
139    }
140
141    /// Constructs a new multipart reader from given Content-Type, boundary, and stream.
142    pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
143    where
144        S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
145    {
146        Self::from_ct_and_boundary_with_buffer_limit(
147            ct,
148            boundary,
149            stream,
150            DEFAULT_CONFIG.buffer_limit,
151        )
152    }
153
154    fn from_ct_and_boundary_with_buffer_limit<S>(
155        ct: Mime,
156        boundary: String,
157        stream: S,
158        buffer_limit: usize,
159    ) -> Multipart
160    where
161        S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
162    {
163        Multipart {
164            safety: Safety::new(),
165            flow: Flow::InFlight(Inner {
166                payload: PayloadRef::new(PayloadBuffer::new_with_limit(stream, buffer_limit)),
167                content_type: ct,
168                boundary,
169                state: State::FirstBoundary,
170                item: Item::None,
171            }),
172        }
173    }
174
175    /// Constructs a new multipart reader from given `MultipartError`.
176    pub(crate) fn from_error(err: Error) -> Multipart {
177        Multipart {
178            flow: Flow::Error(Some(err)),
179            safety: Safety::new(),
180        }
181    }
182
183    /// Return requests parsed Content-Type or raise the stored error.
184    pub(crate) fn content_type_or_bail(&mut self) -> Result<mime::Mime, Error> {
185        match self.flow {
186            Flow::InFlight(ref inner) => Ok(inner.content_type.clone()),
187            Flow::Error(ref mut err) => Err(err
188                .take()
189                .expect("error should not be taken after it was returned")),
190        }
191    }
192}
193
194impl Stream for Multipart {
195    type Item = Result<Field, Error>;
196
197    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
198        let this = self.get_mut();
199
200        match this.flow {
201            Flow::InFlight(ref mut inner) => {
202                if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
203                    // check safety and poll read payload to buffer.
204                    buffer.poll_stream(cx)?;
205                } else if !this.safety.is_clean() {
206                    // safety violation
207                    return Poll::Ready(Some(Err(Error::NotConsumed)));
208                } else {
209                    return Poll::Pending;
210                }
211
212                inner.poll(&this.safety, cx)
213            }
214
215            Flow::Error(ref mut err) => Poll::Ready(Some(Err(err
216                .take()
217                .expect("Multipart polled after finish")))),
218        }
219    }
220}
221
222#[derive(PartialEq, Debug)]
223enum State {
224    /// Skip data until first boundary.
225    FirstBoundary,
226
227    /// Reading boundary.
228    Boundary,
229
230    /// Reading Headers.
231    Headers,
232
233    /// Stream EOF.
234    Eof,
235}
236
237enum Item {
238    None,
239    Field(Rc<RefCell<InnerField>>),
240}
241
242struct Inner {
243    /// Request's payload stream & buffer.
244    payload: PayloadRef,
245
246    /// Request's Content-Type.
247    ///
248    /// Guaranteed to have "multipart" top-level media type, i.e., `multipart/*`.
249    content_type: Mime,
250
251    /// Field boundary.
252    boundary: String,
253
254    state: State,
255    item: Item,
256}
257
258impl Inner {
259    fn read_field_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, Error> {
260        match payload.read_until(b"\r\n\r\n")? {
261            None => {
262                if payload.eof {
263                    Err(Error::Incomplete)
264                } else {
265                    Ok(None)
266                }
267            }
268
269            Some(bytes) => {
270                let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
271
272                match httparse::parse_headers(&bytes, &mut hdrs).map_err(ParseError::from)? {
273                    httparse::Status::Complete((_, hdrs)) => {
274                        // convert headers
275                        let mut headers = HeaderMap::with_capacity(hdrs.len());
276
277                        for h in hdrs {
278                            let name =
279                                HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
280                            let value =
281                                HeaderValue::try_from(h.value).map_err(|_| ParseError::Header)?;
282                            headers.append(name, value);
283                        }
284
285                        Ok(Some(headers))
286                    }
287
288                    httparse::Status::Partial => Err(ParseError::Header.into()),
289                }
290            }
291        }
292    }
293
294    /// Reads a field boundary from the payload buffer (and discards it).
295    ///
296    /// Reads "in-between" and "final" boundaries. E.g. for boundary = "foo":
297    ///
298    /// ```plain
299    /// --foo    <-- in-between fields
300    /// --foo--  <-- end of request body, should be followed by EOF
301    /// ```
302    ///
303    /// Returns:
304    ///
305    /// - `Ok(Some(true))` - final field boundary read (EOF)
306    /// - `Ok(Some(false))` - field boundary read
307    /// - `Ok(None)` - boundary not found, more data needs reading
308    /// - `Err(BoundaryMissing)` - multipart boundary is missing
309    fn read_boundary(payload: &mut PayloadBuffer, boundary: &str) -> Result<Option<bool>, Error> {
310        if boundary.is_empty() {
311            return Err(Error::BoundaryMissing);
312        }
313
314        // TODO: need to read epilogue
315        let chunk = match payload.readline_or_eof()? {
316            // TODO: this might be okay as a let Some() else return Ok(None)
317            None => return Ok(payload.eof.then_some(true)),
318            Some(chunk) => chunk,
319        };
320
321        const BOUNDARY_MARKER: &[u8] = b"--";
322        const LINE_BREAK: &[u8] = b"\r\n";
323
324        let Some(chunk) = chunk.as_ref().strip_prefix(BOUNDARY_MARKER) else {
325            return Err(Error::BoundaryMissing);
326        };
327
328        let Some(chunk) = chunk.strip_prefix(boundary.as_bytes()) else {
329            return Err(Error::BoundaryMissing);
330        };
331
332        if chunk == LINE_BREAK {
333            // boundary is followed by line-break, indicating more fields to come
334            return Ok(Some(false));
335        }
336
337        // boundary is followed by marker
338        if chunk == BOUNDARY_MARKER || chunk == b"--\r\n" {
339            return Ok(Some(true));
340        }
341
342        Err(Error::BoundaryMissing)
343    }
344
345    fn skip_until_boundary(
346        payload: &mut PayloadBuffer,
347        boundary: &str,
348    ) -> Result<Option<bool>, Error> {
349        if boundary.is_empty() {
350            return Err(Error::BoundaryMissing);
351        }
352
353        let mut eof = false;
354        let boundary = boundary.as_bytes();
355
356        loop {
357            match payload.readline()? {
358                Some(chunk) => {
359                    if chunk.is_empty() {
360                        return Err(Error::BoundaryMissing);
361                    }
362
363                    let Some(line) = chunk.as_ref().strip_suffix(b"\r\n") else {
364                        continue;
365                    };
366
367                    if let Some(line) = line.strip_prefix(b"--") {
368                        if line == boundary {
369                            break;
370                        }
371
372                        if line.strip_suffix(b"--") == Some(boundary) {
373                            eof = true;
374                            break;
375                        }
376                    }
377                }
378                None => {
379                    return if payload.eof {
380                        Err(Error::Incomplete)
381                    } else {
382                        Ok(None)
383                    };
384                }
385            }
386        }
387        Ok(Some(eof))
388    }
389
390    fn poll(&mut self, safety: &Safety, cx: &Context<'_>) -> Poll<Option<Result<Field, Error>>> {
391        if self.state == State::Eof {
392            Poll::Ready(None)
393        } else {
394            // release field
395            loop {
396                // Nested multipart streams of fields has to be consumed
397                // before switching to next
398                if safety.current() {
399                    let stop = match self.item {
400                        Item::Field(ref mut field) => match field.borrow_mut().poll(safety) {
401                            Poll::Pending => return Poll::Pending,
402                            Poll::Ready(Some(Ok(_))) => continue,
403                            Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
404                            Poll::Ready(None) => true,
405                        },
406                        Item::None => false,
407                    };
408                    if stop {
409                        self.item = Item::None;
410                    }
411                    if let Item::None = self.item {
412                        break;
413                    }
414                }
415            }
416
417            let field_headers = if let Some(mut payload) = self.payload.get_mut(safety) {
418                match self.state {
419                    // read until first boundary
420                    State::FirstBoundary => {
421                        match Inner::skip_until_boundary(&mut payload, &self.boundary)? {
422                            None => return Poll::Pending,
423                            Some(eof) => {
424                                if eof {
425                                    self.state = State::Eof;
426                                    return Poll::Ready(None);
427                                } else {
428                                    self.state = State::Headers;
429                                }
430                            }
431                        }
432                    }
433
434                    // read boundary
435                    State::Boundary => match Inner::read_boundary(&mut payload, &self.boundary)? {
436                        None => return Poll::Pending,
437                        Some(eof) => {
438                            if eof {
439                                self.state = State::Eof;
440                                return Poll::Ready(None);
441                            } else {
442                                self.state = State::Headers;
443                            }
444                        }
445                    },
446
447                    _ => {}
448                }
449
450                // read field headers for next field
451                if self.state == State::Headers {
452                    if let Some(headers) = Inner::read_field_headers(&mut payload)? {
453                        self.state = State::Boundary;
454                        headers
455                    } else {
456                        return Poll::Pending;
457                    }
458                } else {
459                    unreachable!()
460                }
461            } else {
462                log::debug!("NotReady: field is in flight");
463                return Poll::Pending;
464            };
465
466            let field_content_disposition = field_headers
467                .get(&header::CONTENT_DISPOSITION)
468                .and_then(|cd| ContentDisposition::from_raw(cd).ok())
469                .filter(|content_disposition| {
470                    matches!(
471                        content_disposition.disposition,
472                        header::DispositionType::FormData,
473                    )
474                });
475
476            let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
477                // According to RFC 7578 §4.2, which relates to "multipart/form-data" requests
478                // specifically, fields must have a Content-Disposition header, its disposition
479                // type must be set as "form-data", and it must have a name parameter.
480
481                let Some(cd) = &field_content_disposition else {
482                    return Poll::Ready(Some(Err(Error::ContentDispositionMissing)));
483                };
484
485                let Some(field_name) = cd.get_name() else {
486                    return Poll::Ready(Some(Err(Error::ContentDispositionNameMissing)));
487                };
488
489                Some(field_name.to_owned())
490            } else {
491                None
492            };
493
494            // TODO: check out other multipart/* RFCs for specific requirements
495
496            let field_content_type: Option<Mime> = field_headers
497                .get(&header::CONTENT_TYPE)
498                .and_then(|ct| ct.to_str().ok())
499                .and_then(|ct| ct.parse().ok());
500
501            self.state = State::Boundary;
502
503            // nested multipart stream is not supported
504            if let Some(mime) = &field_content_type {
505                if mime.type_() == mime::MULTIPART {
506                    return Poll::Ready(Some(Err(Error::Nested)));
507                }
508            }
509
510            let field_inner =
511                InnerField::new_in_rc(self.payload.clone(), self.boundary.clone(), &field_headers)?;
512
513            self.item = Item::Field(Rc::clone(&field_inner));
514
515            Poll::Ready(Some(Ok(Field::new(
516                field_content_type,
517                field_content_disposition,
518                form_field_name,
519                field_headers,
520                safety.clone(cx),
521                field_inner,
522            ))))
523        }
524    }
525}
526
527impl Drop for Inner {
528    fn drop(&mut self) {
529        // InnerMultipartItem::Field has to be dropped first because of Safety.
530        self.item = Item::None;
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use std::time::Duration;
537
538    use actix_http::h1;
539    use actix_web::{
540        http::header::{DispositionParam, DispositionType},
541        rt,
542        test::TestRequest,
543        web::{BufMut as _, BytesMut},
544        FromRequest,
545    };
546    use assert_matches::assert_matches;
547    use futures_test::stream::StreamTestExt as _;
548    use futures_util::{stream, StreamExt as _};
549    use tokio::sync::mpsc;
550    use tokio_stream::wrappers::UnboundedReceiverStream;
551
552    use super::*;
553
554    const BOUNDARY: &str = "abbc761f78ff4d7cb7573b5a23f96ef0";
555
556    #[actix_rt::test]
557    async fn test_boundary() {
558        let headers = HeaderMap::new();
559        match Multipart::find_ct_and_boundary(&headers) {
560            Err(Error::ContentTypeMissing) => {}
561            _ => unreachable!("should not happen"),
562        }
563
564        let mut headers = HeaderMap::new();
565        headers.insert(
566            header::CONTENT_TYPE,
567            header::HeaderValue::from_static("test"),
568        );
569
570        match Multipart::find_ct_and_boundary(&headers) {
571            Err(Error::ContentTypeParse) => {}
572            _ => unreachable!("should not happen"),
573        }
574
575        let mut headers = HeaderMap::new();
576        headers.insert(
577            header::CONTENT_TYPE,
578            header::HeaderValue::from_static("multipart/mixed"),
579        );
580        match Multipart::find_ct_and_boundary(&headers) {
581            Err(Error::BoundaryMissing) => {}
582            _ => unreachable!("should not happen"),
583        }
584
585        let mut headers = HeaderMap::new();
586        headers.insert(
587            header::CONTENT_TYPE,
588            header::HeaderValue::from_static(
589                "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
590            ),
591        );
592
593        assert_eq!(
594            Multipart::find_ct_and_boundary(&headers).unwrap().1,
595            "5c02368e880e436dab70ed54e1c58209",
596        );
597    }
598
599    fn create_stream() -> (
600        mpsc::UnboundedSender<Result<Bytes, PayloadError>>,
601        impl Stream<Item = Result<Bytes, PayloadError>>,
602    ) {
603        let (tx, rx) = mpsc::unbounded_channel();
604
605        (
606            tx,
607            UnboundedReceiverStream::new(rx).map(|res| res.map_err(|_| panic!())),
608        )
609    }
610
611    fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
612        let (body, headers) = crate::test::create_form_data_payload_and_headers_with_boundary(
613            BOUNDARY,
614            "file",
615            Some("fn.txt".to_owned()),
616            Some(mime::TEXT_PLAIN_UTF_8),
617            Bytes::from_static(b"data"),
618        );
619
620        let mut buf = BytesMut::with_capacity(body.len() + 14);
621
622        // add junk before form to test pre-boundary data rejection
623        buf.put("testasdadsad\r\n".as_bytes());
624
625        buf.put(body);
626
627        (buf.freeze(), headers)
628    }
629
630    // TODO: use test utility when multi-file support is introduced
631    fn create_double_request_with_header() -> (Bytes, HeaderMap) {
632        let bytes = Bytes::from(
633            "testasdadsad\r\n\
634             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
635             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
636             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
637             test\r\n\
638             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
639             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
640             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
641             data\r\n\
642             --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
643        );
644        let mut headers = HeaderMap::new();
645        headers.insert(
646            header::CONTENT_TYPE,
647            header::HeaderValue::from_static(
648                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
649            ),
650        );
651        (bytes, headers)
652    }
653
654    fn create_header(content_type: &'static str) -> HeaderMap {
655        let mut headers = HeaderMap::new();
656        headers.insert(
657            header::CONTENT_TYPE,
658            header::HeaderValue::from_static(content_type),
659        );
660        headers
661    }
662
663    fn create_multipart_with_buffer_limit(
664        body: impl Stream<Item = Result<Bytes, PayloadError>> + 'static,
665        buffer_limit: usize,
666    ) -> Multipart {
667        Multipart::from_ct_and_boundary_with_buffer_limit(
668            "multipart/mixed; boundary=\"a\"".parse().unwrap(),
669            "a".to_owned(),
670            body,
671            buffer_limit,
672        )
673    }
674
675    #[actix_rt::test]
676    async fn empty_boundary_does_not_panic() {
677        let payload = stream::once(async { Ok(Bytes::from_static(b"\n")) });
678        let ct = "multipart/mixed; boundary=\"a\"".parse().unwrap();
679
680        let mut multipart = Multipart::from_ct_and_boundary(ct, String::new(), payload);
681        let res = multipart.next().await.unwrap();
682        assert_matches!(res, Err(Error::BoundaryMissing));
683    }
684
685    #[actix_rt::test]
686    async fn short_line_with_one_byte_boundary_does_not_panic() {
687        let bytes = Bytes::from_static(b"\n");
688        let mut headers = HeaderMap::new();
689        headers.insert(
690            header::CONTENT_TYPE,
691            header::HeaderValue::from_static("multipart/mixed; boundary=\"a\""),
692        );
693        let payload = stream::once(async { Ok(bytes) });
694
695        let mut multipart = Multipart::new(&headers, payload);
696        let res = multipart.next().await.unwrap();
697        assert_matches!(res, Err(Error::Incomplete));
698    }
699
700    #[actix_rt::test]
701    async fn short_final_boundary_with_one_byte_boundary_does_not_panic() {
702        let bytes = Bytes::from_static(b"--\n");
703        let mut headers = HeaderMap::new();
704        headers.insert(
705            header::CONTENT_TYPE,
706            header::HeaderValue::from_static("multipart/mixed; boundary=\"a\""),
707        );
708        let payload = stream::once(async { Ok(bytes) });
709
710        let mut multipart = Multipart::new(&headers, payload);
711        let res = multipart.next().await.unwrap();
712        assert_matches!(res, Err(Error::Incomplete));
713    }
714
715    #[actix_rt::test]
716    async fn one_byte_boundary_parses_valid_body() {
717        let bytes = Bytes::from_static(
718            b"preamble\r\n\
719            --a\r\n\
720            Content-Type: text/plain\r\n\
721            Content-Length: 3\r\n\
722            \r\n\
723            one\r\n\
724            --a\r\n\
725            Content-Type: text/plain\r\n\
726            Content-Length: 3\r\n\
727            \r\n\
728            two\r\n\
729            --a--\r\n",
730        );
731        let headers = create_header("multipart/mixed; boundary=\"a\"");
732        let payload = stream::once(async { Ok(bytes) });
733
734        let mut multipart = Multipart::new(&headers, payload);
735
736        let mut field = multipart.next().await.unwrap().unwrap();
737        assert_eq!(get_whole_field(&mut field).await, "one");
738        drop(field);
739
740        let mut field = multipart.next().await.unwrap().unwrap();
741        assert_eq!(get_whole_field(&mut field).await, "two");
742        drop(field);
743
744        assert!(multipart.next().await.is_none());
745    }
746
747    #[actix_rt::test]
748    async fn one_byte_boundary_parses_when_split_across_chunks() {
749        let bytes = Bytes::from_static(
750            b"x\r\n\
751            --a\r\n\
752            Content-Type: text/plain\r\n\
753            Content-Length: 4\r\n\
754            \r\n\
755            data\r\n\
756            --a--\r\n",
757        );
758        let headers = create_header("multipart/mixed; boundary=\"a\"");
759        let payload = stream::iter(bytes)
760            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
761            .interleave_pending();
762
763        let mut multipart = Multipart::new(&headers, payload);
764
765        let mut field = multipart.next().await.unwrap().unwrap();
766        assert_eq!(get_whole_field(&mut field).await, "data");
767        drop(field);
768
769        assert!(multipart.next().await.is_none());
770    }
771
772    #[actix_rt::test]
773    async fn short_preamble_lines_before_boundary_are_skipped() {
774        let bytes = Bytes::from_static(
775            b"\n\
776            -\r\n\
777            --a\r\n\
778            Content-Type: text/plain\r\n\
779            Content-Length: 4\r\n\
780            \r\n\
781            data\r\n\
782            --a--\r\n",
783        );
784        let headers = create_header("multipart/mixed; boundary=\"a\"");
785        let payload = stream::once(async { Ok(bytes) });
786
787        let mut multipart = Multipart::new(&headers, payload);
788
789        let mut field = multipart.next().await.unwrap().unwrap();
790        assert_eq!(get_whole_field(&mut field).await, "data");
791        drop(field);
792
793        assert!(multipart.next().await.is_none());
794    }
795
796    #[actix_rt::test]
797    async fn first_boundary_can_be_final() {
798        let bytes = Bytes::from_static(b"--a--\r\n");
799        let headers = create_header("multipart/mixed; boundary=\"a\"");
800        let payload = stream::once(async { Ok(bytes) });
801
802        let mut multipart = Multipart::new(&headers, payload);
803        assert!(multipart.next().await.is_none());
804    }
805
806    #[actix_rt::test]
807    async fn malformed_preamble_over_buffer_limit_errors() {
808        let body = stream::iter(
809            [b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"].map(|chunk| Ok(Bytes::from_static(chunk))),
810        );
811
812        let mut multipart = create_multipart_with_buffer_limit(body, 16);
813        let res = multipart.next().await.unwrap();
814
815        assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
816    }
817
818    #[actix_rt::test]
819    async fn malformed_headers_over_buffer_limit_errors() {
820        let body = stream::iter(
821            [
822                Bytes::from_static(b"--a\r\n"),
823                Bytes::from_static(b"X-Long: 12345678"),
824                Bytes::from_static(b"9012345678901234"),
825                Bytes::from_static(b"5678901234567890"),
826            ]
827            .map(Ok),
828        );
829
830        let mut multipart = create_multipart_with_buffer_limit(body, 24);
831        let res = multipart.next().await.unwrap();
832
833        assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
834    }
835
836    #[actix_rt::test]
837    async fn raw_extractor_uses_configured_buffer_limit() {
838        let (req, mut payload) = TestRequest::default()
839            .insert_header((header::CONTENT_TYPE, "multipart/mixed; boundary=\"a\""))
840            .app_data(MultipartConfig::default().buffer_limit(16))
841            .set_payload(Bytes::from_static(b"aaaaaaaabbbbbbbbcccccccc"))
842            .to_http_parts();
843
844        let mut multipart = Multipart::from_request(&req, &mut payload).await.unwrap();
845        let res = multipart.next().await.unwrap();
846
847        assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
848    }
849
850    #[actix_rt::test]
851    async fn valid_large_field_streams_through_small_parser_buffer() {
852        let mut bytes = BytesMut::new();
853        bytes.put(&b"--a\r\nContent-Length: 100\r\n\r\n"[..]);
854        bytes.put(&[b'x'; 100][..]);
855        bytes.put(&b"\r\n--a--\r\n"[..]);
856        let body = stream::once(async { Ok(bytes.freeze()) });
857
858        let mut multipart = create_multipart_with_buffer_limit(body, 32);
859        let mut field = multipart.next().await.unwrap().unwrap();
860
861        assert_eq!(
862            get_whole_field(&mut field).await,
863            Bytes::from(vec![b'x'; 100])
864        );
865        drop(field);
866        assert!(multipart.next().await.is_none());
867    }
868
869    #[actix_rt::test]
870    async fn test_multipart_no_end_crlf() {
871        let (sender, payload) = create_stream();
872        let (mut bytes, headers) = create_double_request_with_header();
873        let bytes_stripped = bytes.split_to(bytes.len() - 2); // strip crlf
874
875        sender.send(Ok(bytes_stripped)).unwrap();
876        drop(sender); // eof
877
878        let mut multipart = Multipart::new(&headers, payload);
879
880        match multipart.next().await.unwrap() {
881            Ok(_) => {}
882            _ => unreachable!(),
883        }
884
885        match multipart.next().await.unwrap() {
886            Ok(_) => {}
887            _ => unreachable!(),
888        }
889
890        match multipart.next().await {
891            None => {}
892            _ => unreachable!(),
893        }
894    }
895
896    #[actix_rt::test]
897    async fn test_multipart() {
898        let (sender, payload) = create_stream();
899        let (bytes, headers) = create_double_request_with_header();
900
901        sender.send(Ok(bytes)).unwrap();
902
903        let mut multipart = Multipart::new(&headers, payload);
904        match multipart.next().await {
905            Some(Ok(mut field)) => {
906                let cd = field.content_disposition().unwrap();
907                assert_eq!(cd.disposition, DispositionType::FormData);
908                assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
909
910                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
911                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
912
913                match field.next().await.unwrap() {
914                    Ok(chunk) => assert_eq!(chunk, "test"),
915                    _ => unreachable!(),
916                }
917                match field.next().await {
918                    None => {}
919                    _ => unreachable!(),
920                }
921            }
922            _ => unreachable!(),
923        }
924
925        match multipart.next().await.unwrap() {
926            Ok(mut field) => {
927                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
928                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
929
930                match field.next().await {
931                    Some(Ok(chunk)) => assert_eq!(chunk, "data"),
932                    _ => unreachable!(),
933                }
934                match field.next().await {
935                    None => {}
936                    _ => unreachable!(),
937                }
938            }
939            _ => unreachable!(),
940        }
941
942        match multipart.next().await {
943            None => {}
944            _ => unreachable!(),
945        }
946    }
947
948    // Loops, collecting all bytes until end-of-field
949    async fn get_whole_field(field: &mut Field) -> BytesMut {
950        let mut b = BytesMut::new();
951        loop {
952            match field.next().await {
953                Some(Ok(chunk)) => b.extend_from_slice(&chunk),
954                None => return b,
955                _ => unreachable!(),
956            }
957        }
958    }
959
960    #[actix_rt::test]
961    async fn test_stream() {
962        let (bytes, headers) = create_double_request_with_header();
963        let payload = stream::iter(bytes)
964            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
965            .interleave_pending();
966
967        let mut multipart = Multipart::new(&headers, payload);
968        match multipart.next().await.unwrap() {
969            Ok(mut field) => {
970                let cd = field.content_disposition().unwrap();
971                assert_eq!(cd.disposition, DispositionType::FormData);
972                assert_eq!(cd.parameters[0], DispositionParam::Name("file".into()));
973
974                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
975                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
976
977                assert_eq!(get_whole_field(&mut field).await, "test");
978            }
979            _ => unreachable!(),
980        }
981
982        match multipart.next().await {
983            Some(Ok(mut field)) => {
984                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
985                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
986
987                assert_eq!(get_whole_field(&mut field).await, "data");
988            }
989            _ => unreachable!(),
990        }
991
992        match multipart.next().await {
993            None => {}
994            _ => unreachable!(),
995        }
996    }
997
998    #[actix_rt::test]
999    async fn test_multipart_from_error() {
1000        let err = Error::ContentTypeMissing;
1001        let mut multipart = Multipart::from_error(err);
1002        assert!(multipart.next().await.unwrap().is_err())
1003    }
1004
1005    #[actix_rt::test]
1006    async fn test_multipart_from_boundary() {
1007        let (_, payload) = create_stream();
1008        let (_, headers) = create_simple_request_with_header();
1009        let (ct, boundary) = Multipart::find_ct_and_boundary(&headers).unwrap();
1010        let _ = Multipart::from_ct_and_boundary(ct, boundary, payload);
1011    }
1012
1013    #[actix_rt::test]
1014    async fn test_multipart_payload_consumption() {
1015        // with sample payload and HttpRequest with no headers
1016        let (_sender, inner_payload) = h1::Payload::create(false);
1017        let mut payload = actix_web::dev::Payload::from(inner_payload);
1018        let req = TestRequest::default().to_http_request();
1019
1020        // multipart should generate an error
1021        let mut mp = Multipart::from_request(&req, &mut payload).await.unwrap();
1022        assert!(mp.next().await.unwrap().is_err());
1023
1024        // and should not consume the payload
1025        match payload {
1026            actix_web::dev::Payload::H1 { .. } => {} //expected
1027            _ => unreachable!(),
1028        }
1029    }
1030
1031    #[actix_rt::test]
1032    async fn no_content_disposition_form_data() {
1033        let bytes = Bytes::from(
1034            "testasdadsad\r\n\
1035             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1036             Content-Type: text/plain; charset=utf-8\r\n\
1037             Content-Length: 4\r\n\
1038             \r\n\
1039             test\r\n\
1040             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1041        );
1042        let mut headers = HeaderMap::new();
1043        headers.insert(
1044            header::CONTENT_TYPE,
1045            header::HeaderValue::from_static(
1046                "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1047            ),
1048        );
1049        let payload = stream::iter(bytes)
1050            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1051            .interleave_pending();
1052
1053        let mut multipart = Multipart::new(&headers, payload);
1054        let res = multipart.next().await.unwrap();
1055        assert_matches!(
1056            res.expect_err(
1057                "according to RFC 7578, form-data fields require a content-disposition header"
1058            ),
1059            Error::ContentDispositionMissing
1060        );
1061    }
1062
1063    #[actix_rt::test]
1064    async fn no_content_disposition_non_form_data() {
1065        let bytes = Bytes::from(
1066            "testasdadsad\r\n\
1067             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1068             Content-Type: text/plain; charset=utf-8\r\n\
1069             Content-Length: 4\r\n\
1070             \r\n\
1071             test\r\n\
1072             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1073        );
1074        let mut headers = HeaderMap::new();
1075        headers.insert(
1076            header::CONTENT_TYPE,
1077            header::HeaderValue::from_static(
1078                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1079            ),
1080        );
1081        let payload = stream::iter(bytes)
1082            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1083            .interleave_pending();
1084
1085        let mut multipart = Multipart::new(&headers, payload);
1086        let res = multipart.next().await.unwrap();
1087        res.unwrap();
1088    }
1089
1090    #[actix_rt::test]
1091    async fn no_name_in_form_data_content_disposition() {
1092        let bytes = Bytes::from(
1093            "testasdadsad\r\n\
1094             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
1095             Content-Disposition: form-data; filename=\"fn.txt\"\r\n\
1096             Content-Type: text/plain; charset=utf-8\r\n\
1097             Content-Length: 4\r\n\
1098             \r\n\
1099             test\r\n\
1100             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n",
1101        );
1102        let mut headers = HeaderMap::new();
1103        headers.insert(
1104            header::CONTENT_TYPE,
1105            header::HeaderValue::from_static(
1106                "multipart/form-data; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
1107            ),
1108        );
1109        let payload = stream::iter(bytes)
1110            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
1111            .interleave_pending();
1112
1113        let mut multipart = Multipart::new(&headers, payload);
1114        let res = multipart.next().await.unwrap();
1115        assert_matches!(
1116            res.expect_err("according to RFC 7578, form-data fields require a name attribute"),
1117            Error::ContentDispositionNameMissing
1118        );
1119    }
1120
1121    #[actix_rt::test]
1122    async fn test_drop_multipart_dont_hang() {
1123        let (sender, payload) = create_stream();
1124        let (bytes, headers) = create_simple_request_with_header();
1125        sender.send(Ok(bytes)).unwrap();
1126        drop(sender); // eof
1127
1128        let mut multipart = Multipart::new(&headers, payload);
1129        let mut field = multipart.next().await.unwrap().unwrap();
1130
1131        drop(multipart);
1132
1133        // should fail immediately
1134        match field.next().await {
1135            Some(Err(Error::NotConsumed)) => {}
1136            _ => panic!(),
1137        };
1138    }
1139
1140    #[actix_rt::test]
1141    async fn test_drop_field_awaken_multipart() {
1142        let (sender, payload) = create_stream();
1143        let (bytes, headers) = create_double_request_with_header();
1144        sender.send(Ok(bytes)).unwrap();
1145        drop(sender); // eof
1146
1147        let mut multipart = Multipart::new(&headers, payload);
1148        let mut field = multipart.next().await.unwrap().unwrap();
1149
1150        let task = rt::spawn(async move {
1151            rt::time::sleep(Duration::from_millis(500)).await;
1152            assert_eq!(field.next().await.unwrap().unwrap(), "test");
1153            drop(field);
1154        });
1155
1156        // dropping field should awaken current task
1157        let _ = multipart.next().await.unwrap().unwrap();
1158        task.await.unwrap();
1159    }
1160}