Skip to main content

actix_multipart/
field.rs

1use std::{
2    cell::RefCell,
3    cmp, fmt,
4    future::poll_fn,
5    mem,
6    pin::Pin,
7    rc::Rc,
8    task::{ready, Context, Poll},
9};
10
11use actix_web::{
12    error::PayloadError,
13    http::header::{self, ContentDisposition, HeaderMap},
14    web::{Bytes, BytesMut},
15};
16use derive_more::{Display, Error};
17use futures_core::Stream;
18use mime::Mime;
19
20use crate::{
21    error::Error,
22    payload::{PayloadBuffer, PayloadRef},
23    safety::Safety,
24};
25
26/// Error type returned from [`Field::bytes()`] when field data is larger than limit.
27#[derive(Debug, Display, Error)]
28#[display("size limit exceeded while collecting field data")]
29#[non_exhaustive]
30pub struct LimitExceeded;
31
32/// A single field in a multipart stream.
33pub struct Field {
34    /// Field's Content-Type.
35    content_type: Option<Mime>,
36
37    /// Field's Content-Disposition.
38    content_disposition: Option<ContentDisposition>,
39
40    /// Form field name.
41    ///
42    /// A non-optional storage for form field names to avoid unwraps in `form` module. Will be an
43    /// empty string in non-form contexts.
44    ///
45    // INVARIANT: always non-empty when request content-type is multipart/form-data.
46    pub(crate) form_field_name: String,
47
48    /// Field's header map.
49    headers: HeaderMap,
50
51    safety: Safety,
52    inner: Rc<RefCell<InnerField>>,
53}
54
55impl Field {
56    pub(crate) fn new(
57        content_type: Option<Mime>,
58        content_disposition: Option<ContentDisposition>,
59        form_field_name: Option<String>,
60        headers: HeaderMap,
61        safety: Safety,
62        inner: Rc<RefCell<InnerField>>,
63    ) -> Self {
64        Field {
65            content_type,
66            content_disposition,
67            form_field_name: form_field_name.unwrap_or_default(),
68            headers,
69            inner,
70            safety,
71        }
72    }
73
74    /// Returns a reference to the field's header map.
75    pub fn headers(&self) -> &HeaderMap {
76        &self.headers
77    }
78
79    /// Returns a reference to the field's content (mime) type, if it is supplied by the client.
80    ///
81    /// According to [RFC 7578](https://www.rfc-editor.org/rfc/rfc7578#section-4.4), if it is not
82    /// present, it should default to "text/plain". Note it is the responsibility of the client to
83    /// provide the appropriate content type, there is no attempt to validate this by the server.
84    pub fn content_type(&self) -> Option<&Mime> {
85        self.content_type.as_ref()
86    }
87
88    /// Returns this field's parsed Content-Disposition header, if set.
89    ///
90    /// # Validation
91    ///
92    /// Per [RFC 7578 §4.2], the parts of a multipart/form-data payload MUST contain a
93    /// Content-Disposition header field where the disposition type is `form-data` and MUST also
94    /// contain an additional parameter of `name` with its value being the original field name from
95    /// the form. This requirement is enforced during extraction for multipart/form-data requests,
96    /// but not other kinds of multipart requests (such as multipart/related).
97    ///
98    /// As such, it is safe to `.unwrap()` calls `.content_disposition()` if you've verified.
99    ///
100    /// The [`name()`](Self::name) method is also provided as a convenience for obtaining the
101    /// aforementioned name parameter.
102    ///
103    /// [RFC 7578 §4.2]: https://datatracker.ietf.org/doc/html/rfc7578#section-4.2
104    pub fn content_disposition(&self) -> Option<&ContentDisposition> {
105        self.content_disposition.as_ref()
106    }
107
108    /// Returns the field's name, if set.
109    ///
110    /// See [`content_disposition()`](Self::content_disposition) regarding guarantees on presence of
111    /// the "name" field.
112    pub fn name(&self) -> Option<&str> {
113        self.content_disposition()?.get_name()
114    }
115
116    /// Collects the raw field data, up to `limit` bytes.
117    ///
118    /// # Errors
119    ///
120    /// Any errors produced by the data stream are returned as `Ok(Err(Error))` immediately.
121    ///
122    /// If the buffered data size would exceed `limit`, an `Err(LimitExceeded)` is returned. Note
123    /// that, in this case, the full data stream is exhausted before returning the error so that
124    /// subsequent fields can still be read. To better defend against malicious/infinite requests,
125    /// it is advisable to also put a timeout on this call.
126    pub async fn bytes(&mut self, limit: usize) -> Result<Result<Bytes, Error>, LimitExceeded> {
127        /// Sensible default (2kB) for initial, bounded allocation when collecting body bytes.
128        const INITIAL_ALLOC_BYTES: usize = 2 * 1024;
129
130        let mut exceeded_limit = false;
131        let mut buf = BytesMut::with_capacity(INITIAL_ALLOC_BYTES);
132
133        let mut field = Pin::new(self);
134
135        match poll_fn(|cx| loop {
136            match ready!(field.as_mut().poll_next(cx)) {
137                // if already over limit, discard chunk to advance multipart request
138                Some(Ok(_chunk)) if exceeded_limit => {}
139
140                // if limit is exceeded set flag to true and continue
141                Some(Ok(chunk)) if buf.len() + chunk.len() > limit => {
142                    exceeded_limit = true;
143                    // eagerly de-allocate field data buffer
144                    let _ = mem::take(&mut buf);
145                }
146
147                Some(Ok(chunk)) => buf.extend_from_slice(&chunk),
148
149                None => return Poll::Ready(Ok(())),
150                Some(Err(err)) => return Poll::Ready(Err(err)),
151            }
152        })
153        .await
154        {
155            // propagate error returned from body poll
156            Err(err) => Ok(Err(err)),
157
158            // limit was exceeded while reading body
159            Ok(()) if exceeded_limit => Err(LimitExceeded),
160
161            // otherwise return body buffer
162            Ok(()) => Ok(Ok(buf.freeze())),
163        }
164    }
165}
166
167impl Stream for Field {
168    type Item = Result<Bytes, Error>;
169
170    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
171        let this = self.get_mut();
172        let mut inner = this.inner.borrow_mut();
173
174        if let Some(mut buffer) = inner
175            .payload
176            .as_ref()
177            .expect("Field should not be polled after completion")
178            .get_mut(&this.safety)
179        {
180            // check safety and poll read payload to buffer.
181            buffer.poll_stream(cx)?;
182        } else if !this.safety.is_clean() {
183            // safety violation
184            return Poll::Ready(Some(Err(Error::NotConsumed)));
185        } else {
186            return Poll::Pending;
187        }
188
189        inner.poll(&this.safety)
190    }
191}
192
193impl fmt::Debug for Field {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        if let Some(ct) = &self.content_type {
196            writeln!(f, "\nField: {}", ct)?;
197        } else {
198            writeln!(f, "\nField:")?;
199        }
200        writeln!(f, "  boundary: {}", self.inner.borrow().boundary)?;
201        writeln!(f, "  headers:")?;
202        for (key, val) in self.headers.iter() {
203            writeln!(f, "    {:?}: {:?}", key, val)?;
204        }
205        Ok(())
206    }
207}
208
209pub(crate) struct InnerField {
210    /// Payload is initialized as Some and is `take`n when the field stream finishes.
211    payload: Option<PayloadRef>,
212
213    /// Field boundary (without "--" prefix).
214    boundary: String,
215
216    /// True if request payload has been exhausted.
217    eof: bool,
218
219    /// Field data's stated size according to it's Content-Length header.
220    length: Option<u64>,
221}
222
223impl InnerField {
224    pub(crate) fn new_in_rc(
225        payload: PayloadRef,
226        boundary: String,
227        headers: &HeaderMap,
228    ) -> Result<Rc<RefCell<InnerField>>, PayloadError> {
229        Self::new(payload, boundary, headers).map(|this| Rc::new(RefCell::new(this)))
230    }
231
232    pub(crate) fn new(
233        payload: PayloadRef,
234        boundary: String,
235        headers: &HeaderMap,
236    ) -> Result<InnerField, PayloadError> {
237        let len = if let Some(len) = headers.get(&header::CONTENT_LENGTH) {
238            match len.to_str().ok().and_then(|len| len.parse::<u64>().ok()) {
239                Some(len) => Some(len),
240                None => return Err(PayloadError::Incomplete(None)),
241            }
242        } else {
243            None
244        };
245
246        Ok(InnerField {
247            boundary,
248            payload: Some(payload),
249            eof: false,
250            length: len,
251        })
252    }
253
254    /// Reads body part content chunk of the specified size.
255    ///
256    /// The body part must has `Content-Length` header with proper value.
257    pub(crate) fn read_len(
258        payload: &mut PayloadBuffer,
259        size: &mut u64,
260    ) -> Poll<Option<Result<Bytes, Error>>> {
261        if *size == 0 {
262            Poll::Ready(None)
263        } else {
264            match payload.read_max(*size)? {
265                Some(mut chunk) => {
266                    let len = cmp::min(chunk.len() as u64, *size);
267                    *size -= len;
268                    let ch = chunk.split_to(len as usize);
269                    if !chunk.is_empty() {
270                        payload.unprocessed(chunk);
271                    }
272                    Poll::Ready(Some(Ok(ch)))
273                }
274                None => {
275                    if payload.eof && (*size != 0) {
276                        Poll::Ready(Some(Err(Error::Incomplete)))
277                    } else {
278                        Poll::Pending
279                    }
280                }
281            }
282        }
283    }
284
285    /// Reads content chunk of body part with unknown length.
286    ///
287    /// The `Content-Length` header for body part is not necessary.
288    pub(crate) fn read_stream(
289        payload: &mut PayloadBuffer,
290        boundary: &str,
291    ) -> Poll<Option<Result<Bytes, Error>>> {
292        let mut pos = 0;
293
294        let len = payload.buf.len();
295
296        if len == 0 {
297            return if payload.eof {
298                Poll::Ready(Some(Err(Error::Incomplete)))
299            } else {
300                Poll::Pending
301            };
302        }
303
304        // check boundary
305        if len > 4 && payload.buf[0] == b'\r' {
306            let b_len = if payload.buf.starts_with(b"\r\n") && &payload.buf[2..4] == b"--" {
307                Some(4)
308            } else if &payload.buf[1..3] == b"--" {
309                Some(3)
310            } else {
311                None
312            };
313
314            if let Some(b_len) = b_len {
315                let b_size = boundary.len() + b_len;
316                if len < b_size {
317                    return Poll::Pending;
318                } else if &payload.buf[b_len..b_size] == boundary.as_bytes() {
319                    // found boundary
320                    return Poll::Ready(None);
321                }
322            }
323        }
324
325        loop {
326            return if let Some(idx) = memchr::memmem::find(&payload.buf[pos..], b"\r") {
327                let cur = pos + idx;
328
329                // check if we have enough data for boundary detection
330                if cur + 4 > len {
331                    if cur > 0 {
332                        Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
333                    } else {
334                        Poll::Pending
335                    }
336                } else {
337                    // check boundary
338                    if (&payload.buf[cur..cur + 2] == b"\r\n"
339                        && &payload.buf[cur + 2..cur + 4] == b"--")
340                        || (&payload.buf[cur..=cur] == b"\r"
341                            && &payload.buf[cur + 1..cur + 3] == b"--")
342                    {
343                        if cur != 0 {
344                            // return buffer
345                            Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze())))
346                        } else {
347                            pos = cur + 1;
348                            continue;
349                        }
350                    } else {
351                        // not boundary
352                        pos = cur + 1;
353                        continue;
354                    }
355                }
356            } else {
357                Poll::Ready(Some(Ok(payload.buf.split().freeze())))
358            };
359        }
360    }
361
362    pub(crate) fn poll(&mut self, safety: &Safety) -> Poll<Option<Result<Bytes, Error>>> {
363        if self.payload.is_none() {
364            return Poll::Ready(None);
365        }
366
367        let Some(mut payload) = self
368            .payload
369            .as_ref()
370            .expect("Field should not be polled after completion")
371            .get_mut(safety)
372        else {
373            return Poll::Pending;
374        };
375
376        if !self.eof {
377            let res = if let Some(ref mut len) = self.length {
378                Self::read_len(&mut payload, len)
379            } else {
380                Self::read_stream(&mut payload, &self.boundary)
381            };
382
383            match ready!(res) {
384                Some(Ok(bytes)) => return Poll::Ready(Some(Ok(bytes))),
385                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
386                None => self.eof = true,
387            }
388        }
389
390        let result = match payload.readline() {
391            Ok(None) => Poll::Pending,
392            Ok(Some(line)) => {
393                if line.as_ref() != b"\r\n" {
394                    log::warn!("multipart field did not read all the data or it is malformed");
395                }
396                Poll::Ready(None)
397            }
398            Err(err) => Poll::Ready(Some(Err(err))),
399        };
400
401        drop(payload);
402
403        if let Poll::Ready(None) = result {
404            // drop payload buffer and make future un-poll-able
405            let _ = self.payload.take();
406        }
407
408        result
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use futures_util::{stream, StreamExt as _};
415
416    use super::*;
417    use crate::Multipart;
418
419    // TODO: use test utility when multi-file support is introduced
420    fn create_double_request_with_header() -> (Bytes, HeaderMap) {
421        let bytes = Bytes::from(
422            "testasdadsad\r\n\
423             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
424             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
425             Content-Type: text/plain; charset=utf-8\r\n\
426             \r\n\
427             one+one+one\r\n\
428             --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\
429             Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\
430             Content-Type: text/plain; charset=utf-8\r\n\
431             \r\n\
432             two+two+two\r\n\
433             --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n",
434        );
435        let mut headers = HeaderMap::new();
436        headers.insert(
437            header::CONTENT_TYPE,
438            header::HeaderValue::from_static(
439                "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"",
440            ),
441        );
442        (bytes, headers)
443    }
444
445    #[actix_rt::test]
446    async fn bytes_unlimited() {
447        let (body, headers) = create_double_request_with_header();
448
449        let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
450
451        let field = multipart
452            .next()
453            .await
454            .expect("multipart should have two fields")
455            .expect("multipart body should be well formatted")
456            .bytes(usize::MAX)
457            .await
458            .expect("field data should not be size limited")
459            .expect("reading field data should not error");
460        assert_eq!(field, "one+one+one");
461
462        let field = multipart
463            .next()
464            .await
465            .expect("multipart should have two fields")
466            .expect("multipart body should be well formatted")
467            .bytes(usize::MAX)
468            .await
469            .expect("field data should not be size limited")
470            .expect("reading field data should not error");
471        assert_eq!(field, "two+two+two");
472    }
473
474    #[actix_rt::test]
475    async fn bytes_limited() {
476        let (body, headers) = create_double_request_with_header();
477
478        let mut multipart = Multipart::new(&headers, stream::iter([Ok(body)]));
479
480        multipart
481            .next()
482            .await
483            .expect("multipart should have two fields")
484            .expect("multipart body should be well formatted")
485            .bytes(8) // smaller than data size
486            .await
487            .expect_err("field data should be size limited");
488
489        // next field still readable
490        let field = multipart
491            .next()
492            .await
493            .expect("multipart should have two fields")
494            .expect("multipart body should be well formatted")
495            .bytes(usize::MAX)
496            .await
497            .expect("field data should not be size limited")
498            .expect("reading field data should not error");
499        assert_eq!(field, "two+two+two");
500    }
501}