Skip to main content

ntex_multipart/
multipart.rs

1//! Multipart payload support
2
3use crate::Field;
4use crate::error::MultipartError;
5use crate::field::InnerField;
6use crate::payload::{PayloadBuffer, PayloadRef};
7use crate::safety::Safety;
8use futures::stream::Stream;
9use mime::Mime;
10use ntex::http::error::{DecodeError, PayloadError};
11use ntex::http::header::{self, HeaderMap, HeaderName, HeaderValue};
12use ntex::util::Bytes;
13use ntex_files::header::DispositionType;
14use ntex_files::header::{ContentDisposition, Header};
15use std::cell::RefCell;
16use std::task::{Context, Poll};
17use std::{convert::TryFrom, pin::Pin, rc::Rc};
18
19const MAX_HEADERS: usize = 32;
20
21/// The server-side implementation of `multipart/form-data` requests.
22///
23/// This will parse the incoming stream into `MultipartItem` instances via its
24/// Stream implementation.
25/// `MultipartItem::Field` contains multipart field. `MultipartItem::Multipart`
26/// is used for nested multipart streams.
27pub struct Multipart {
28    safety: Safety,
29    error: Option<MultipartError>,
30    inner: Option<Rc<RefCell<InnerMultipart>>>,
31}
32
33enum InnerMultipartItem {
34    None,
35    Field(Rc<RefCell<InnerField>>),
36}
37
38#[derive(PartialEq, Debug)]
39enum InnerState {
40    /// Stream eof
41    Eof,
42    /// Skip data until first boundary
43    FirstBoundary,
44    /// Reading boundary
45    Boundary,
46    /// Reading Headers,
47    Headers,
48}
49
50struct InnerMultipart {
51    payload: PayloadRef,
52    content_type: Mime,
53    boundary: String,
54    state: InnerState,
55    item: InnerMultipartItem,
56}
57
58impl Multipart {
59    /// Create multipart instance for boundary.
60    pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
61    where
62        S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
63    {
64        match Self::boundary(headers) {
65            Ok((ct, boundary)) => Multipart {
66                error: None,
67                safety: Safety::new(),
68                inner: Some(Rc::new(RefCell::new(InnerMultipart {
69                    boundary,
70                    content_type: ct,
71                    payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
72                    state: InnerState::FirstBoundary,
73                    item: InnerMultipartItem::None,
74                }))),
75            },
76            Err(err) => Multipart { error: Some(err), safety: Safety::new(), inner: None },
77        }
78    }
79
80    /// Extract boundary info from headers.
81    pub(crate) fn boundary(headers: &HeaderMap) -> Result<(Mime, String), MultipartError> {
82        if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
83            if let Ok(content_type) = content_type.to_str() {
84                if let Ok(ct) = content_type.parse::<Mime>() {
85                    if ct.type_() == mime::MULTIPART {
86                        if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
87                            Ok((ct.clone(), boundary.as_str().to_owned()))
88                        } else {
89                            Err(MultipartError::Boundary)
90                        }
91                    } else {
92                        Err(MultipartError::IncompatibleContentType)
93                    }
94                } else {
95                    Err(MultipartError::ParseContentType)
96                }
97            } else {
98                Err(MultipartError::ParseContentType)
99            }
100        } else {
101            Err(MultipartError::NoContentType)
102        }
103    }
104
105    /// Return requests parsed Content-Type or raise the stored error.
106    pub(crate) fn content_type(&mut self) -> Result<Mime, MultipartError> {
107        if let Some(err) = self.error.take() {
108            Err(err)
109        } else {
110            Ok(self.inner.as_ref().unwrap().borrow().content_type.clone())
111        }
112    }
113}
114
115impl Stream for Multipart {
116    type Item = Result<Field, MultipartError>;
117
118    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
119        if let Some(err) = self.error.take() {
120            Poll::Ready(Some(Err(err)))
121        } else if self.safety.current() {
122            let this = self.get_mut();
123            let mut inner = this.inner.as_mut().unwrap().borrow_mut();
124            if let Some(mut payload) = inner.payload.get_mut(&this.safety) {
125                payload.poll_stream(cx)?;
126            }
127            inner.poll(&this.safety, cx)
128        } else if !self.safety.is_clean() {
129            Poll::Ready(Some(Err(MultipartError::NotConsumed)))
130        } else {
131            Poll::Pending
132        }
133    }
134}
135
136impl InnerMultipart {
137    fn read_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, MultipartError> {
138        match payload.read_until(b"\r\n\r\n")? {
139            None => {
140                if payload.eof {
141                    Err(MultipartError::Incomplete)
142                } else {
143                    Ok(None)
144                }
145            }
146            Some(bytes) => {
147                let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
148                match httparse::parse_headers(&bytes, &mut hdrs) {
149                    Ok(httparse::Status::Complete((_, hdrs))) => {
150                        // convert headers
151                        let mut headers = HeaderMap::with_capacity(hdrs.len());
152                        for h in hdrs {
153                            if let Ok(name) = HeaderName::try_from(h.name) {
154                                if let Ok(value) = HeaderValue::try_from(h.value) {
155                                    headers.append(name, value);
156                                } else {
157                                    return Err(DecodeError::Header.into());
158                                }
159                            } else {
160                                return Err(DecodeError::Header.into());
161                            }
162                        }
163                        Ok(Some(headers))
164                    }
165                    Ok(httparse::Status::Partial) => Err(DecodeError::Header.into()),
166                    Err(err) => Err(DecodeError::from(err).into()),
167                }
168            }
169        }
170    }
171
172    fn read_boundary(
173        payload: &mut PayloadBuffer,
174        boundary: &str,
175    ) -> Result<Option<bool>, MultipartError> {
176        // TODO: need to read epilogue
177        match payload.readline_or_eof()? {
178            None => {
179                if payload.eof {
180                    Ok(Some(true))
181                } else {
182                    Ok(None)
183                }
184            }
185            Some(chunk) => {
186                if chunk.len() < boundary.len() + 4
187                    || &chunk[..2] != b"--"
188                    || &chunk[2..boundary.len() + 2] != boundary.as_bytes()
189                {
190                    Err(MultipartError::Boundary)
191                } else if &chunk[boundary.len() + 2..] == b"\r\n" {
192                    Ok(Some(false))
193                } else if &chunk[boundary.len() + 2..boundary.len() + 4] == b"--"
194                    && (chunk.len() == boundary.len() + 4
195                        || &chunk[boundary.len() + 4..] == b"\r\n")
196                {
197                    Ok(Some(true))
198                } else {
199                    Err(MultipartError::Boundary)
200                }
201            }
202        }
203    }
204
205    fn skip_until_boundary(
206        payload: &mut PayloadBuffer,
207        boundary: &str,
208    ) -> Result<Option<bool>, MultipartError> {
209        let mut eof = false;
210        loop {
211            match payload.readline()? {
212                Some(chunk) => {
213                    if chunk.is_empty() {
214                        return Err(MultipartError::Boundary);
215                    }
216                    if chunk.len() < boundary.len() {
217                        continue;
218                    }
219                    if &chunk[..2] == b"--" && &chunk[2..chunk.len() - 2] == boundary.as_bytes()
220                    {
221                        break;
222                    } else {
223                        if chunk.len() < boundary.len() + 2 {
224                            continue;
225                        }
226                        let b: &[u8] = boundary.as_ref();
227                        if &chunk[..boundary.len()] == b
228                            && &chunk[boundary.len()..boundary.len() + 2] == b"--"
229                        {
230                            eof = true;
231                            break;
232                        }
233                    }
234                }
235                None => {
236                    return if payload.eof {
237                        Err(MultipartError::Incomplete)
238                    } else {
239                        Ok(None)
240                    };
241                }
242            }
243        }
244        Ok(Some(eof))
245    }
246
247    fn poll(
248        &mut self,
249        safety: &Safety,
250        cx: &mut Context,
251    ) -> Poll<Option<Result<Field, MultipartError>>> {
252        if self.state == InnerState::Eof {
253            Poll::Ready(None)
254        } else {
255            // release field
256            loop {
257                // Nested multipart streams of fields has to be consumed
258                // before switching to next
259                if safety.current() {
260                    let stop = match self.item {
261                        InnerMultipartItem::Field(ref mut field) => {
262                            match field.borrow_mut().poll(safety) {
263                                Poll::Pending => return Poll::Pending,
264                                Poll::Ready(Some(Ok(_))) => continue,
265                                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
266                                Poll::Ready(None) => true,
267                            }
268                        }
269                        InnerMultipartItem::None => false,
270                    };
271                    if stop {
272                        self.item = InnerMultipartItem::None;
273                    }
274                    if let InnerMultipartItem::None = self.item {
275                        break;
276                    }
277                }
278            }
279
280            let headers = if let Some(mut payload) = self.payload.get_mut(safety) {
281                match self.state {
282                    // read until first boundary
283                    InnerState::FirstBoundary => {
284                        match InnerMultipart::skip_until_boundary(&mut payload, &self.boundary)?
285                        {
286                            Some(eof) => {
287                                if eof {
288                                    self.state = InnerState::Eof;
289                                    return Poll::Ready(None);
290                                } else {
291                                    self.state = InnerState::Headers;
292                                }
293                            }
294                            None => return Poll::Pending,
295                        }
296                    }
297                    // read boundary
298                    InnerState::Boundary => {
299                        match InnerMultipart::read_boundary(&mut payload, &self.boundary)? {
300                            None => return Poll::Pending,
301                            Some(eof) => {
302                                if eof {
303                                    self.state = InnerState::Eof;
304                                    return Poll::Ready(None);
305                                } else {
306                                    self.state = InnerState::Headers;
307                                }
308                            }
309                        }
310                    }
311                    _ => (),
312                }
313
314                // read field headers for next field
315                if self.state == InnerState::Headers {
316                    if let Some(headers) = InnerMultipart::read_headers(&mut payload)? {
317                        self.state = InnerState::Boundary;
318                        headers
319                    } else {
320                        return Poll::Pending;
321                    }
322                } else {
323                    unreachable!()
324                }
325            } else {
326                log::debug!("NotReady: field is in flight");
327                return Poll::Pending;
328            };
329
330            let field_content_disposition = if let Some(hv) =
331                headers.get(&header::CONTENT_DISPOSITION)
332                && let Ok(cd) = ContentDisposition::parse_header(
333                    &ntex_files::header::Raw::from(hv.as_bytes()),
334                )
335                && cd.disposition == DispositionType::FormData
336            {
337                Some(cd)
338            } else {
339                None
340            };
341
342            let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
343                let Some(cd) = &field_content_disposition else {
344                    return Poll::Ready(Some(Err(MultipartError::ContentDispositionMissing)));
345                };
346
347                let Some(field_name) = cd.get_name() else {
348                    return Poll::Ready(Some(Err(
349                        MultipartError::ContentDispositionNameMissing,
350                    )));
351                };
352
353                Some(field_name.to_owned())
354            } else {
355                None
356            };
357
358            let field_content_type: Option<Mime> = if let Some(content_type) =
359                headers.get(&header::CONTENT_TYPE)
360                && let Ok(content_type) = content_type.to_str()
361                && let Ok(ct) = content_type.parse::<Mime>()
362            {
363                Some(ct)
364            } else {
365                None
366            };
367
368            self.state = InnerState::Boundary;
369
370            // nested multipart stream is not supported
371            if let Some(mime) = &field_content_type
372                && mime.type_() == mime::MULTIPART
373            {
374                return Poll::Ready(Some(Err(MultipartError::Nested)));
375            }
376
377            let field = Rc::new(RefCell::new(InnerField::new(
378                self.payload.clone(),
379                self.boundary.clone(),
380                &headers,
381            )?));
382            self.item = InnerMultipartItem::Field(Rc::clone(&field));
383
384            Poll::Ready(Some(Ok(Field::new(
385                safety.clone(cx),
386                headers,
387                field_content_type,
388                field_content_disposition,
389                form_field_name,
390                field,
391            ))))
392        }
393    }
394}
395
396impl Drop for InnerMultipart {
397    fn drop(&mut self) {
398        // InnerMultipartItem::Field has to be dropped first because of Safety.
399        self.item = InnerMultipartItem::None;
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::Field;
407    use futures::{StreamExt as _, stream};
408    use futures_test::stream::StreamTestExt as _;
409    use ntex::util::BytesMut;
410    use ntex::{channel::mpsc, util::Bytes};
411
412    #[ntex::test]
413    async fn test_boundary() {
414        let headers = HeaderMap::new();
415        match Multipart::boundary(&headers) {
416            Err(MultipartError::NoContentType) => (),
417            _ => unreachable!("should not happen"),
418        }
419
420        let mut headers = HeaderMap::new();
421        headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("test"));
422
423        match Multipart::boundary(&headers) {
424            Err(MultipartError::ParseContentType) => (),
425            _ => unreachable!("should not happen"),
426        }
427
428        let mut headers = HeaderMap::new();
429        headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("multipart/mixed"));
430        match Multipart::boundary(&headers) {
431            Err(MultipartError::Boundary) => (),
432            _ => unreachable!("should not happen"),
433        }
434
435        let mut headers = HeaderMap::new();
436        headers.insert(
437            header::CONTENT_TYPE,
438            HeaderValue::from_static(
439                "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
440            ),
441        );
442
443        assert_eq!(
444            Multipart::boundary(&headers).unwrap().1,
445            "5c02368e880e436dab70ed54e1c58209"
446        );
447    }
448
449    fn create_stream() -> (
450        mpsc::Sender<Result<Bytes, PayloadError>>,
451        impl Stream<Item = Result<Bytes, PayloadError>>,
452    ) {
453        let (tx, rx) = mpsc::channel();
454
455        (tx, rx.map(|res| res.map_err(|_| panic!())))
456    }
457
458    fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
459        let bytes = Bytes::from(
460            "testasdadsad\r\n\
461             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
462             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
463             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
464             test\r\n\
465             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
466             Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
467             data\r\n\
468             --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
469        );
470        let mut headers = HeaderMap::new();
471        headers.insert(
472            header::CONTENT_TYPE,
473            HeaderValue::from_static(
474                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
475            ),
476        );
477        (bytes, headers)
478    }
479
480    #[ntex::test]
481    async fn test_multipart_no_end_crlf() {
482        let (sender, payload) = create_stream();
483        let (mut bytes, headers) = create_simple_request_with_header();
484        let bytes_stripped = bytes.split_to(bytes.len()); // strip crlf
485
486        sender.send(Ok(bytes_stripped)).unwrap();
487        drop(sender); // eof
488
489        let mut multipart = Multipart::new(&headers, payload);
490
491        match multipart.next().await.unwrap() {
492            Ok(_) => (),
493            _ => unreachable!(),
494        }
495
496        match multipart.next().await.unwrap() {
497            Ok(_) => (),
498            _ => unreachable!(),
499        }
500
501        match multipart.next().await {
502            None => (),
503            _ => unreachable!(),
504        }
505    }
506
507    #[ntex::test]
508    async fn test_multipart() {
509        let (sender, payload) = create_stream();
510        let (bytes, headers) = create_simple_request_with_header();
511
512        sender.send(Ok(bytes)).unwrap();
513
514        let mut multipart = Multipart::new(&headers, payload);
515        match multipart.next().await {
516            Some(Ok(mut field)) => {
517                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
518                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
519
520                match field.next().await.unwrap() {
521                    Ok(chunk) => assert_eq!(chunk, "test"),
522                    _ => unreachable!(),
523                }
524                match field.next().await {
525                    None => (),
526                    _ => unreachable!(),
527                }
528            }
529            _ => unreachable!(),
530        }
531
532        match multipart.next().await.unwrap() {
533            Ok(mut field) => {
534                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
535                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
536
537                match field.next().await {
538                    Some(Ok(chunk)) => assert_eq!(chunk, "data"),
539                    _ => unreachable!(),
540                }
541                match field.next().await {
542                    None => (),
543                    _ => unreachable!(),
544                }
545            }
546            _ => unreachable!(),
547        }
548
549        match multipart.next().await {
550            None => (),
551            _ => unreachable!(),
552        }
553    }
554
555    // Loops, collecting all bytes until end-of-field
556    async fn get_whole_field(field: &mut Field) -> BytesMut {
557        let mut b = BytesMut::new();
558        loop {
559            match field.next().await {
560                Some(Ok(chunk)) => b.extend_from_slice(&chunk),
561                None => return b,
562                _ => unreachable!(),
563            }
564        }
565    }
566
567    #[ntex::test]
568    async fn test_stream() {
569        let (bytes, headers) = create_simple_request_with_header();
570        let payload = stream::iter(bytes)
571            .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
572            .interleave_pending();
573
574        let mut multipart = Multipart::new(&headers, payload);
575        match multipart.next().await.unwrap() {
576            Ok(mut field) => {
577                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
578                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
579
580                assert_eq!(get_whole_field(&mut field).await, "test");
581            }
582            _ => unreachable!(),
583        }
584
585        match multipart.next().await {
586            Some(Ok(mut field)) => {
587                assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
588                assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
589
590                assert_eq!(get_whole_field(&mut field).await, "data");
591            }
592            _ => unreachable!(),
593        }
594
595        match multipart.next().await {
596            None => (),
597            _ => unreachable!(),
598        }
599    }
600}