1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use crate::{
error::Error,
types::{Form, Value},
upload::handle_multipart,
};
use actix_web::{
dev::{Payload, Service, ServiceRequest, Transform},
FromRequest, HttpMessage, HttpRequest,
};
use futures::future::{ok, Ready};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot::{channel, Receiver};
struct Uploaded {
rx: Receiver<Value>,
}
pub struct MultipartMiddleware<S> {
form: Form,
service: S,
}
impl FromRequest for Value {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
type Config = ();
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let opt = req.extensions_mut().remove::<Uploaded>();
Box::pin(async move {
let fut = opt.ok_or(Error::MissingMiddleware)?;
fut.rx.await.map_err(|_| Error::TxDropped)
})
}
}
impl<S> Transform<S> for Form
where
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = ();
type Transform = MultipartMiddleware<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(MultipartMiddleware {
form: self.clone(),
service,
})
}
}
impl<S> Service for MultipartMiddleware<S>
where
S: Service<Request = ServiceRequest, Error = actix_web::Error>,
S::Future: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<S::Response, S::Error>>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, mut req: S::Request) -> Self::Future {
let (tx, rx) = channel();
req.extensions_mut().insert(Uploaded { rx });
let payload = req.take_payload();
let multipart = actix_multipart::Multipart::new(req.headers(), payload);
let form = self.form.clone();
let fut = self.service.call(req);
Box::pin(async move {
let uploaded = match handle_multipart(multipart, form.clone()).await {
Ok(uploaded) => uploaded,
Err(e) => {
if let Some(f) = form.transform_error.clone() {
return Err((f)(e));
} else {
return Err(e.into());
}
}
};
let _ = tx.send(uploaded);
fut.await
})
}
}