http_multipart/
field.rs

1use core::{cmp, pin::Pin};
2
3use bytes::{Bytes, BytesMut};
4use futures_core::stream::Stream;
5use http::header::HeaderMap;
6use memchr::memmem;
7
8use super::{
9    content_disposition::ContentDisposition,
10    error::{MultipartError, PayloadError},
11    Multipart,
12};
13
14pub struct Field<'a, S> {
15    decoder: FieldDecoder,
16    cp: ContentDisposition,
17    multipart: Pin<&'a mut Multipart<S>>,
18}
19
20impl<S> Drop for Field<'_, S> {
21    fn drop(&mut self) {
22        self.multipart.as_mut().project().headers.clear();
23    }
24}
25
26impl<'a, S> Field<'a, S> {
27    pub(super) fn new(length: Option<u64>, cp: ContentDisposition, multipart: Pin<&'a mut Multipart<S>>) -> Self {
28        let typ = match length {
29            Some(len) => FieldDecoder::Fixed(len),
30            None => FieldDecoder::StreamBegin,
31        };
32        Self {
33            decoder: typ,
34            cp,
35            multipart,
36        }
37    }
38}
39
40#[derive(Default)]
41pub(super) enum FieldDecoder {
42    Fixed(u64),
43    #[default]
44    StreamBegin,
45    StreamPossibleEnd,
46    StreamEnd,
47}
48
49impl<S, T, E> Field<'_, S>
50where
51    S: Stream<Item = Result<T, E>>,
52    T: AsRef<[u8]> + 'static,
53    E: Into<PayloadError>,
54{
55    /// The field name found in the [http::header::CONTENT_DISPOSITION] header.
56    pub fn name(&self) -> Option<&str> {
57        self.cp
58            .name_from_headers(self.headers())
59            .and_then(|s| std::str::from_utf8(s).ok())
60    }
61
62    /// The file name found in the [http::header::CONTENT_DISPOSITION] header.
63    pub fn file_name(&self) -> Option<&str> {
64        self.cp
65            .filename_from_headers(self.headers())
66            .and_then(|s| std::str::from_utf8(s).ok())
67    }
68
69    pub fn headers(&self) -> &HeaderMap {
70        &self.multipart.headers
71    }
72
73    pub async fn try_next(&mut self) -> Result<Option<Bytes>, MultipartError> {
74        loop {
75            let multipart = self.multipart.as_mut().project();
76            let buf = multipart.buf;
77
78            // check multipart buffer first and drain it if possible.
79            if !buf.is_empty() {
80                match self.decoder {
81                    FieldDecoder::Fixed(0) | FieldDecoder::StreamEnd => {
82                        *multipart.pending_field = false;
83                        return Ok(None);
84                    }
85                    FieldDecoder::Fixed(ref mut len) => {
86                        let at = cmp::min(*len, buf.len() as u64);
87                        *len -= at;
88                        let chunk = buf.split_to(at as usize).freeze();
89                        return Ok(Some(chunk));
90                    }
91                    FieldDecoder::StreamBegin | FieldDecoder::StreamPossibleEnd => {
92                        if let Some(at) = self.decoder.try_find_split_idx(buf, multipart.boundary)? {
93                            return Ok(Some(buf.split_to(at).freeze()));
94                        }
95                    }
96                }
97            }
98
99            // multipart buffer is empty. read more from stream.
100            let item = self.multipart.as_mut().try_read_stream().await?;
101
102            let multipart = self.multipart.as_mut().project();
103            let buf = multipart.buf;
104
105            // try to deal with the read bytes in place before extend to multipart buffer.
106            match self.decoder {
107                FieldDecoder::Fixed(0) => {
108                    buf.extend_from_slice(item.as_ref());
109                    *multipart.pending_field = false;
110                    return Ok(None);
111                }
112                FieldDecoder::Fixed(ref mut len) => {
113                    let chunk = item.as_ref();
114                    let at = cmp::min(*len, chunk.len() as u64);
115                    *len -= at;
116                    let bytes = split_bytes(item, at as usize, buf);
117                    return Ok(Some(bytes));
118                }
119                FieldDecoder::StreamBegin => match self.decoder.try_find_split_idx(&item, multipart.boundary)? {
120                    Some(at) => {
121                        let bytes = split_bytes(item, at, buf);
122                        return Ok(Some(bytes));
123                    }
124                    None => buf.extend_from_slice(item.as_ref()),
125                },
126                // possible boundary where partial boundary is already in the the buffer. so extend to it and check again.
127                FieldDecoder::StreamPossibleEnd => buf.extend_from_slice(item.as_ref()),
128                FieldDecoder::StreamEnd => {
129                    *multipart.pending_field = false;
130                    return Ok(None);
131                }
132            }
133        }
134    }
135}
136
137impl FieldDecoder {
138    pub(super) fn try_find_split_idx<T>(&mut self, item: &T, boundary: &[u8]) -> Result<Option<usize>, MultipartError>
139    where
140        T: AsRef<[u8]>,
141    {
142        let item = item.as_ref();
143        match memmem::find(item, super::DOUBLE_HYPHEN) {
144            Some(idx) => {
145                let start = idx + super::DOUBLE_HYPHEN.len();
146                let length = cmp::min(item.len() - start, boundary.len());
147                let end = start + length;
148
149                let slice = &item[start..end];
150
151                // not boundary so split till end offset.
152                if !boundary.starts_with(slice) {
153                    return Ok(Some(end));
154                }
155
156                // possible boundary but no full view yet.
157                if boundary.len() > slice.len() {
158                    *self = FieldDecoder::StreamPossibleEnd;
159                    return Ok(None);
160                }
161
162                *self = FieldDecoder::StreamEnd;
163
164                let idx = idx.checked_sub(2).ok_or(MultipartError::Boundary)?;
165                Ok(Some(idx))
166            }
167            None => Ok(Some(item.len())),
168        }
169    }
170}
171
172fn split_bytes<T>(item: T, at: usize, buf: &mut BytesMut) -> Bytes
173where
174    T: AsRef<[u8]> + 'static,
175{
176    match try_downcast_to_bytes(item) {
177        Ok(mut item) => {
178            if item.len() == at {
179                return item;
180            }
181            let bytes = item.split_to(at);
182            buf.extend_from_slice(item.as_ref());
183            bytes
184        }
185        Err(item) => {
186            let chunk = item.as_ref();
187            let bytes = Bytes::copy_from_slice(&chunk[..at]);
188            buf.extend_from_slice(&chunk[at..]);
189            bytes
190        }
191    }
192}
193
194fn try_downcast_to_bytes<T: 'static>(item: T) -> Result<Bytes, T> {
195    use std::any::Any;
196
197    let item = &mut Some(item);
198    match (item as &mut dyn Any).downcast_mut::<Option<Bytes>>() {
199        Some(bytes) => Ok(bytes.take().unwrap()),
200        None => Err(item.take().unwrap()),
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use super::*;
207
208    #[test]
209    fn downcast_bytes() {
210        let bytes = Bytes::new();
211        assert!(try_downcast_to_bytes(bytes).is_ok());
212        let bytes = Vec::<u8>::new();
213        assert!(try_downcast_to_bytes(bytes).is_err());
214    }
215}