1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use anyhow::{anyhow, Error, Result};
use bytes::Bytes;
use futures_util::stream::Stream;
use http::header::{CONTENT_DISPOSITION, CONTENT_TYPE};
use crate::utils::{parse_content_disposition, parse_content_type, parse_part_headers};
use crate::Field;
use crate::State;
pub struct FormData<T> {
state: Arc<Mutex<State<T>>>,
}
impl<T> FormData<T> {
pub fn new<B: AsRef<[u8]>>(b: B, t: T) -> Self {
Self {
state: Arc::new(Mutex::new(State::new(b, t))),
}
}
pub fn state(&self) -> Arc<Mutex<State<T>>> {
self.state.clone()
}
}
impl<T, O, E> Stream for FormData<T>
where
T: Stream<Item = Result<O, E>> + Unpin + Send + 'static,
O: Into<Bytes>,
E: Into<Error>,
{
type Item = Result<Field<T>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let state = self.state();
let mut state = state.try_lock().map_err(|e| anyhow!(e.to_string()))?;
if state.waker().is_some() {
return Poll::Pending;
}
match Pin::new(&mut *state).poll_next(cx)? {
Poll::Ready(res) => match res {
Some(0) | None => Poll::Ready(None),
Some(len) => {
let headers_buffer = state.buffer_mut().split_to(len);
let mut headers = parse_part_headers(&headers_buffer)?;
log::debug!("parse headers {:#?}", &headers_buffer);
let names = headers.remove(CONTENT_DISPOSITION).map_or_else(
|| Err(anyhow!("invalid content disposition")),
|v| parse_content_disposition(&v.as_bytes()),
)?;
let mut field = Field::<T>::empty();
field.name = names.0;
field.filename = names.1;
field.content_type = parse_content_type(headers.remove(CONTENT_TYPE).as_ref());
field.index.replace(state.incr_index());
field.state_mut().replace(self.state.clone());
if headers.len() > 0 {
field.headers_mut().replace(headers);
}
state.waker_mut().replace(cx.waker().clone());
Poll::Ready(Some(Ok(field)))
}
},
Poll::Pending => Poll::Pending,
}
}
}