1use core::{cmp, pin::Pin};
2
3use bytes::{Bytes, BytesMut};
4use futures_core::stream::Stream;
5use http::header::HeaderMap;
6use memchr::memmem;
7
8use super::{
9 content_disposition::ContentDisposition,
10 error::{MultipartError, PayloadError},
11 Multipart,
12};
13
14pub struct Field<'a, S> {
15 decoder: FieldDecoder,
16 cp: ContentDisposition,
17 multipart: Pin<&'a mut Multipart<S>>,
18}
19
20impl<S> Drop for Field<'_, S> {
21 fn drop(&mut self) {
22 self.multipart.as_mut().project().headers.clear();
23 }
24}
25
26impl<'a, S> Field<'a, S> {
27 pub(super) fn new(length: Option<u64>, cp: ContentDisposition, multipart: Pin<&'a mut Multipart<S>>) -> Self {
28 let typ = match length {
29 Some(len) => FieldDecoder::Fixed(len),
30 None => FieldDecoder::StreamBegin,
31 };
32 Self {
33 decoder: typ,
34 cp,
35 multipart,
36 }
37 }
38}
39
40#[derive(Default)]
41pub(super) enum FieldDecoder {
42 Fixed(u64),
43 #[default]
44 StreamBegin,
45 StreamPossibleEnd,
46 StreamEnd,
47}
48
49impl<S, T, E> Field<'_, S>
50where
51 S: Stream<Item = Result<T, E>>,
52 T: AsRef<[u8]> + 'static,
53 E: Into<PayloadError>,
54{
55 pub fn name(&self) -> Option<&str> {
57 self.cp
58 .name_from_headers(self.headers())
59 .and_then(|s| std::str::from_utf8(s).ok())
60 }
61
62 pub fn file_name(&self) -> Option<&str> {
64 self.cp
65 .filename_from_headers(self.headers())
66 .and_then(|s| std::str::from_utf8(s).ok())
67 }
68
69 pub fn headers(&self) -> &HeaderMap {
70 &self.multipart.headers
71 }
72
73 pub async fn try_next(&mut self) -> Result<Option<Bytes>, MultipartError> {
74 loop {
75 let multipart = self.multipart.as_mut().project();
76 let buf = multipart.buf;
77
78 if !buf.is_empty() {
80 match self.decoder {
81 FieldDecoder::Fixed(0) | FieldDecoder::StreamEnd => {
82 *multipart.pending_field = false;
83 return Ok(None);
84 }
85 FieldDecoder::Fixed(ref mut len) => {
86 let at = cmp::min(*len, buf.len() as u64);
87 *len -= at;
88 let chunk = buf.split_to(at as usize).freeze();
89 return Ok(Some(chunk));
90 }
91 FieldDecoder::StreamBegin | FieldDecoder::StreamPossibleEnd => {
92 if let Some(at) = self.decoder.try_find_split_idx(buf, multipart.boundary)? {
93 return Ok(Some(buf.split_to(at).freeze()));
94 }
95 }
96 }
97 }
98
99 let item = self.multipart.as_mut().try_read_stream().await?;
101
102 let multipart = self.multipart.as_mut().project();
103 let buf = multipart.buf;
104
105 match self.decoder {
107 FieldDecoder::Fixed(0) => {
108 buf.extend_from_slice(item.as_ref());
109 *multipart.pending_field = false;
110 return Ok(None);
111 }
112 FieldDecoder::Fixed(ref mut len) => {
113 let chunk = item.as_ref();
114 let at = cmp::min(*len, chunk.len() as u64);
115 *len -= at;
116 let bytes = split_bytes(item, at as usize, buf);
117 return Ok(Some(bytes));
118 }
119 FieldDecoder::StreamBegin => match self.decoder.try_find_split_idx(&item, multipart.boundary)? {
120 Some(at) => {
121 let bytes = split_bytes(item, at, buf);
122 return Ok(Some(bytes));
123 }
124 None => buf.extend_from_slice(item.as_ref()),
125 },
126 FieldDecoder::StreamPossibleEnd => buf.extend_from_slice(item.as_ref()),
128 FieldDecoder::StreamEnd => {
129 *multipart.pending_field = false;
130 return Ok(None);
131 }
132 }
133 }
134 }
135}
136
137impl FieldDecoder {
138 pub(super) fn try_find_split_idx<T>(&mut self, item: &T, boundary: &[u8]) -> Result<Option<usize>, MultipartError>
139 where
140 T: AsRef<[u8]>,
141 {
142 let item = item.as_ref();
143 match memmem::find(item, super::DOUBLE_HYPHEN) {
144 Some(idx) => {
145 let start = idx + super::DOUBLE_HYPHEN.len();
146 let length = cmp::min(item.len() - start, boundary.len());
147 let end = start + length;
148
149 let slice = &item[start..end];
150
151 if !boundary.starts_with(slice) {
153 return Ok(Some(end));
154 }
155
156 if boundary.len() > slice.len() {
158 *self = FieldDecoder::StreamPossibleEnd;
159 return Ok(None);
160 }
161
162 *self = FieldDecoder::StreamEnd;
163
164 let idx = idx.checked_sub(2).ok_or(MultipartError::Boundary)?;
165 Ok(Some(idx))
166 }
167 None => Ok(Some(item.len())),
168 }
169 }
170}
171
172fn split_bytes<T>(item: T, at: usize, buf: &mut BytesMut) -> Bytes
173where
174 T: AsRef<[u8]> + 'static,
175{
176 match try_downcast_to_bytes(item) {
177 Ok(mut item) => {
178 if item.len() == at {
179 return item;
180 }
181 let bytes = item.split_to(at);
182 buf.extend_from_slice(item.as_ref());
183 bytes
184 }
185 Err(item) => {
186 let chunk = item.as_ref();
187 let bytes = Bytes::copy_from_slice(&chunk[..at]);
188 buf.extend_from_slice(&chunk[at..]);
189 bytes
190 }
191 }
192}
193
194fn try_downcast_to_bytes<T: 'static>(item: T) -> Result<Bytes, T> {
195 use std::any::Any;
196
197 let item = &mut Some(item);
198 match (item as &mut dyn Any).downcast_mut::<Option<Bytes>>() {
199 Some(bytes) => Ok(bytes.take().unwrap()),
200 None => Err(item.take().unwrap()),
201 }
202}
203
204#[cfg(test)]
205mod test {
206 use super::*;
207
208 #[test]
209 fn downcast_bytes() {
210 let bytes = Bytes::new();
211 assert!(try_downcast_to_bytes(bytes).is_ok());
212 let bytes = Vec::<u8>::new();
213 assert!(try_downcast_to_bytes(bytes).is_err());
214 }
215}