1use crate::Field;
4use crate::error::MultipartError;
5use crate::field::InnerField;
6use crate::payload::{PayloadBuffer, PayloadRef};
7use crate::safety::Safety;
8use futures::stream::Stream;
9use mime::Mime;
10use ntex::http::error::{DecodeError, PayloadError};
11use ntex::http::header::{self, HeaderMap, HeaderName, HeaderValue};
12use ntex::util::Bytes;
13use ntex_files::header::DispositionType;
14use ntex_files::header::{ContentDisposition, Header};
15use std::cell::RefCell;
16use std::task::{Context, Poll};
17use std::{convert::TryFrom, pin::Pin, rc::Rc};
18
19const MAX_HEADERS: usize = 32;
20
21pub struct Multipart {
28 safety: Safety,
29 error: Option<MultipartError>,
30 inner: Option<Rc<RefCell<InnerMultipart>>>,
31}
32
33enum InnerMultipartItem {
34 None,
35 Field(Rc<RefCell<InnerField>>),
36}
37
38#[derive(PartialEq, Debug)]
39enum InnerState {
40 Eof,
42 FirstBoundary,
44 Boundary,
46 Headers,
48}
49
50struct InnerMultipart {
51 payload: PayloadRef,
52 content_type: Mime,
53 boundary: String,
54 state: InnerState,
55 item: InnerMultipartItem,
56}
57
58impl Multipart {
59 pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
61 where
62 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
63 {
64 match Self::boundary(headers) {
65 Ok((ct, boundary)) => Multipart {
66 error: None,
67 safety: Safety::new(),
68 inner: Some(Rc::new(RefCell::new(InnerMultipart {
69 boundary,
70 content_type: ct,
71 payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
72 state: InnerState::FirstBoundary,
73 item: InnerMultipartItem::None,
74 }))),
75 },
76 Err(err) => Multipart { error: Some(err), safety: Safety::new(), inner: None },
77 }
78 }
79
80 pub(crate) fn boundary(headers: &HeaderMap) -> Result<(Mime, String), MultipartError> {
82 if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
83 if let Ok(content_type) = content_type.to_str() {
84 if let Ok(ct) = content_type.parse::<Mime>() {
85 if ct.type_() == mime::MULTIPART {
86 if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
87 Ok((ct.clone(), boundary.as_str().to_owned()))
88 } else {
89 Err(MultipartError::Boundary)
90 }
91 } else {
92 Err(MultipartError::IncompatibleContentType)
93 }
94 } else {
95 Err(MultipartError::ParseContentType)
96 }
97 } else {
98 Err(MultipartError::ParseContentType)
99 }
100 } else {
101 Err(MultipartError::NoContentType)
102 }
103 }
104
105 pub(crate) fn content_type(&mut self) -> Result<Mime, MultipartError> {
107 if let Some(err) = self.error.take() {
108 Err(err)
109 } else {
110 Ok(self.inner.as_ref().unwrap().borrow().content_type.clone())
111 }
112 }
113}
114
115impl Stream for Multipart {
116 type Item = Result<Field, MultipartError>;
117
118 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
119 if let Some(err) = self.error.take() {
120 Poll::Ready(Some(Err(err)))
121 } else if self.safety.current() {
122 let this = self.get_mut();
123 let mut inner = this.inner.as_mut().unwrap().borrow_mut();
124 if let Some(mut payload) = inner.payload.get_mut(&this.safety) {
125 payload.poll_stream(cx)?;
126 }
127 inner.poll(&this.safety, cx)
128 } else if !self.safety.is_clean() {
129 Poll::Ready(Some(Err(MultipartError::NotConsumed)))
130 } else {
131 Poll::Pending
132 }
133 }
134}
135
136impl InnerMultipart {
137 fn read_headers(payload: &mut PayloadBuffer) -> Result<Option<HeaderMap>, MultipartError> {
138 match payload.read_until(b"\r\n\r\n")? {
139 None => {
140 if payload.eof {
141 Err(MultipartError::Incomplete)
142 } else {
143 Ok(None)
144 }
145 }
146 Some(bytes) => {
147 let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
148 match httparse::parse_headers(&bytes, &mut hdrs) {
149 Ok(httparse::Status::Complete((_, hdrs))) => {
150 let mut headers = HeaderMap::with_capacity(hdrs.len());
152 for h in hdrs {
153 if let Ok(name) = HeaderName::try_from(h.name) {
154 if let Ok(value) = HeaderValue::try_from(h.value) {
155 headers.append(name, value);
156 } else {
157 return Err(DecodeError::Header.into());
158 }
159 } else {
160 return Err(DecodeError::Header.into());
161 }
162 }
163 Ok(Some(headers))
164 }
165 Ok(httparse::Status::Partial) => Err(DecodeError::Header.into()),
166 Err(err) => Err(DecodeError::from(err).into()),
167 }
168 }
169 }
170 }
171
172 fn read_boundary(
173 payload: &mut PayloadBuffer,
174 boundary: &str,
175 ) -> Result<Option<bool>, MultipartError> {
176 match payload.readline_or_eof()? {
178 None => {
179 if payload.eof {
180 Ok(Some(true))
181 } else {
182 Ok(None)
183 }
184 }
185 Some(chunk) => {
186 if chunk.len() < boundary.len() + 4
187 || &chunk[..2] != b"--"
188 || &chunk[2..boundary.len() + 2] != boundary.as_bytes()
189 {
190 Err(MultipartError::Boundary)
191 } else if &chunk[boundary.len() + 2..] == b"\r\n" {
192 Ok(Some(false))
193 } else if &chunk[boundary.len() + 2..boundary.len() + 4] == b"--"
194 && (chunk.len() == boundary.len() + 4
195 || &chunk[boundary.len() + 4..] == b"\r\n")
196 {
197 Ok(Some(true))
198 } else {
199 Err(MultipartError::Boundary)
200 }
201 }
202 }
203 }
204
205 fn skip_until_boundary(
206 payload: &mut PayloadBuffer,
207 boundary: &str,
208 ) -> Result<Option<bool>, MultipartError> {
209 let mut eof = false;
210 loop {
211 match payload.readline()? {
212 Some(chunk) => {
213 if chunk.is_empty() {
214 return Err(MultipartError::Boundary);
215 }
216 if chunk.len() < boundary.len() {
217 continue;
218 }
219 if &chunk[..2] == b"--" && &chunk[2..chunk.len() - 2] == boundary.as_bytes()
220 {
221 break;
222 } else {
223 if chunk.len() < boundary.len() + 2 {
224 continue;
225 }
226 let b: &[u8] = boundary.as_ref();
227 if &chunk[..boundary.len()] == b
228 && &chunk[boundary.len()..boundary.len() + 2] == b"--"
229 {
230 eof = true;
231 break;
232 }
233 }
234 }
235 None => {
236 return if payload.eof {
237 Err(MultipartError::Incomplete)
238 } else {
239 Ok(None)
240 };
241 }
242 }
243 }
244 Ok(Some(eof))
245 }
246
247 fn poll(
248 &mut self,
249 safety: &Safety,
250 cx: &mut Context,
251 ) -> Poll<Option<Result<Field, MultipartError>>> {
252 if self.state == InnerState::Eof {
253 Poll::Ready(None)
254 } else {
255 loop {
257 if safety.current() {
260 let stop = match self.item {
261 InnerMultipartItem::Field(ref mut field) => {
262 match field.borrow_mut().poll(safety) {
263 Poll::Pending => return Poll::Pending,
264 Poll::Ready(Some(Ok(_))) => continue,
265 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
266 Poll::Ready(None) => true,
267 }
268 }
269 InnerMultipartItem::None => false,
270 };
271 if stop {
272 self.item = InnerMultipartItem::None;
273 }
274 if let InnerMultipartItem::None = self.item {
275 break;
276 }
277 }
278 }
279
280 let headers = if let Some(mut payload) = self.payload.get_mut(safety) {
281 match self.state {
282 InnerState::FirstBoundary => {
284 match InnerMultipart::skip_until_boundary(&mut payload, &self.boundary)?
285 {
286 Some(eof) => {
287 if eof {
288 self.state = InnerState::Eof;
289 return Poll::Ready(None);
290 } else {
291 self.state = InnerState::Headers;
292 }
293 }
294 None => return Poll::Pending,
295 }
296 }
297 InnerState::Boundary => {
299 match InnerMultipart::read_boundary(&mut payload, &self.boundary)? {
300 None => return Poll::Pending,
301 Some(eof) => {
302 if eof {
303 self.state = InnerState::Eof;
304 return Poll::Ready(None);
305 } else {
306 self.state = InnerState::Headers;
307 }
308 }
309 }
310 }
311 _ => (),
312 }
313
314 if self.state == InnerState::Headers {
316 if let Some(headers) = InnerMultipart::read_headers(&mut payload)? {
317 self.state = InnerState::Boundary;
318 headers
319 } else {
320 return Poll::Pending;
321 }
322 } else {
323 unreachable!()
324 }
325 } else {
326 log::debug!("NotReady: field is in flight");
327 return Poll::Pending;
328 };
329
330 let field_content_disposition = if let Some(hv) =
331 headers.get(&header::CONTENT_DISPOSITION)
332 && let Ok(cd) = ContentDisposition::parse_header(
333 &ntex_files::header::Raw::from(hv.as_bytes()),
334 )
335 && cd.disposition == DispositionType::FormData
336 {
337 Some(cd)
338 } else {
339 None
340 };
341
342 let form_field_name = if self.content_type.subtype() == mime::FORM_DATA {
343 let Some(cd) = &field_content_disposition else {
344 return Poll::Ready(Some(Err(MultipartError::ContentDispositionMissing)));
345 };
346
347 let Some(field_name) = cd.get_name() else {
348 return Poll::Ready(Some(Err(
349 MultipartError::ContentDispositionNameMissing,
350 )));
351 };
352
353 Some(field_name.to_owned())
354 } else {
355 None
356 };
357
358 let field_content_type: Option<Mime> = if let Some(content_type) =
359 headers.get(&header::CONTENT_TYPE)
360 && let Ok(content_type) = content_type.to_str()
361 && let Ok(ct) = content_type.parse::<Mime>()
362 {
363 Some(ct)
364 } else {
365 None
366 };
367
368 self.state = InnerState::Boundary;
369
370 if let Some(mime) = &field_content_type
372 && mime.type_() == mime::MULTIPART
373 {
374 return Poll::Ready(Some(Err(MultipartError::Nested)));
375 }
376
377 let field = Rc::new(RefCell::new(InnerField::new(
378 self.payload.clone(),
379 self.boundary.clone(),
380 &headers,
381 )?));
382 self.item = InnerMultipartItem::Field(Rc::clone(&field));
383
384 Poll::Ready(Some(Ok(Field::new(
385 safety.clone(cx),
386 headers,
387 field_content_type,
388 field_content_disposition,
389 form_field_name,
390 field,
391 ))))
392 }
393 }
394}
395
396impl Drop for InnerMultipart {
397 fn drop(&mut self) {
398 self.item = InnerMultipartItem::None;
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::Field;
407 use futures::{StreamExt as _, stream};
408 use futures_test::stream::StreamTestExt as _;
409 use ntex::util::BytesMut;
410 use ntex::{channel::mpsc, util::Bytes};
411
412 #[ntex::test]
413 async fn test_boundary() {
414 let headers = HeaderMap::new();
415 match Multipart::boundary(&headers) {
416 Err(MultipartError::NoContentType) => (),
417 _ => unreachable!("should not happen"),
418 }
419
420 let mut headers = HeaderMap::new();
421 headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("test"));
422
423 match Multipart::boundary(&headers) {
424 Err(MultipartError::ParseContentType) => (),
425 _ => unreachable!("should not happen"),
426 }
427
428 let mut headers = HeaderMap::new();
429 headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("multipart/mixed"));
430 match Multipart::boundary(&headers) {
431 Err(MultipartError::Boundary) => (),
432 _ => unreachable!("should not happen"),
433 }
434
435 let mut headers = HeaderMap::new();
436 headers.insert(
437 header::CONTENT_TYPE,
438 HeaderValue::from_static(
439 "multipart/mixed; boundary=\"5c02368e880e436dab70ed54e1c58209\"",
440 ),
441 );
442
443 assert_eq!(
444 Multipart::boundary(&headers).unwrap().1,
445 "5c02368e880e436dab70ed54e1c58209"
446 );
447 }
448
449 fn create_stream() -> (
450 mpsc::Sender<Result<Bytes, PayloadError>>,
451 impl Stream<Item = Result<Bytes, PayloadError>>,
452 ) {
453 let (tx, rx) = mpsc::channel();
454
455 (tx, rx.map(|res| res.map_err(|_| panic!())))
456 }
457
458 fn create_simple_request_with_header() -> (Bytes, HeaderMap) {
459 let bytes = Bytes::from(
460 "testasdadsad\r\n\
461 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
462 Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
463 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
464 test\r\n\
465 --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
466 Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\
467 data\r\n\
468 --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
469 );
470 let mut headers = HeaderMap::new();
471 headers.insert(
472 header::CONTENT_TYPE,
473 HeaderValue::from_static(
474 "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
475 ),
476 );
477 (bytes, headers)
478 }
479
480 #[ntex::test]
481 async fn test_multipart_no_end_crlf() {
482 let (sender, payload) = create_stream();
483 let (mut bytes, headers) = create_simple_request_with_header();
484 let bytes_stripped = bytes.split_to(bytes.len()); sender.send(Ok(bytes_stripped)).unwrap();
487 drop(sender); let mut multipart = Multipart::new(&headers, payload);
490
491 match multipart.next().await.unwrap() {
492 Ok(_) => (),
493 _ => unreachable!(),
494 }
495
496 match multipart.next().await.unwrap() {
497 Ok(_) => (),
498 _ => unreachable!(),
499 }
500
501 match multipart.next().await {
502 None => (),
503 _ => unreachable!(),
504 }
505 }
506
507 #[ntex::test]
508 async fn test_multipart() {
509 let (sender, payload) = create_stream();
510 let (bytes, headers) = create_simple_request_with_header();
511
512 sender.send(Ok(bytes)).unwrap();
513
514 let mut multipart = Multipart::new(&headers, payload);
515 match multipart.next().await {
516 Some(Ok(mut field)) => {
517 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
518 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
519
520 match field.next().await.unwrap() {
521 Ok(chunk) => assert_eq!(chunk, "test"),
522 _ => unreachable!(),
523 }
524 match field.next().await {
525 None => (),
526 _ => unreachable!(),
527 }
528 }
529 _ => unreachable!(),
530 }
531
532 match multipart.next().await.unwrap() {
533 Ok(mut field) => {
534 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
535 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
536
537 match field.next().await {
538 Some(Ok(chunk)) => assert_eq!(chunk, "data"),
539 _ => unreachable!(),
540 }
541 match field.next().await {
542 None => (),
543 _ => unreachable!(),
544 }
545 }
546 _ => unreachable!(),
547 }
548
549 match multipart.next().await {
550 None => (),
551 _ => unreachable!(),
552 }
553 }
554
555 async fn get_whole_field(field: &mut Field) -> BytesMut {
557 let mut b = BytesMut::new();
558 loop {
559 match field.next().await {
560 Some(Ok(chunk)) => b.extend_from_slice(&chunk),
561 None => return b,
562 _ => unreachable!(),
563 }
564 }
565 }
566
567 #[ntex::test]
568 async fn test_stream() {
569 let (bytes, headers) = create_simple_request_with_header();
570 let payload = stream::iter(bytes)
571 .map(|byte| Ok(Bytes::copy_from_slice(&[byte])))
572 .interleave_pending();
573
574 let mut multipart = Multipart::new(&headers, payload);
575 match multipart.next().await.unwrap() {
576 Ok(mut field) => {
577 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
578 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
579
580 assert_eq!(get_whole_field(&mut field).await, "test");
581 }
582 _ => unreachable!(),
583 }
584
585 match multipart.next().await {
586 Some(Ok(mut field)) => {
587 assert_eq!(field.content_type().unwrap().type_(), mime::TEXT);
588 assert_eq!(field.content_type().unwrap().subtype(), mime::PLAIN);
589
590 assert_eq!(get_whole_field(&mut field).await, "data");
591 }
592 _ => unreachable!(),
593 }
594
595 match multipart.next().await {
596 None => (),
597 _ => unreachable!(),
598 }
599 }
600}