actix_form_data/
middleware.rs1use crate::{
21 types::{Form, Value},
22 upload::handle_multipart,
23};
24use actix_web::{
25 dev::{Payload, Service, ServiceRequest, Transform},
26 http::StatusCode,
27 FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
28};
29use futures_util::future::LocalBoxFuture;
30use std::{
31 future::{ready, Ready},
32 task::{Context, Poll},
33};
34use tokio::sync::oneshot::{channel, Receiver};
35
36#[derive(Debug, thiserror::Error)]
37pub enum FromRequestError {
38 #[error("Uploaded guard used without Multipart middleware")]
39 MissingMiddleware,
40 #[error("Impossible Error! Middleware exists, didn't fail, and didn't send value")]
41 TxDropped,
42}
43
44impl ResponseError for FromRequestError {
45 fn status_code(&self) -> StatusCode {
46 match self {
47 Self::MissingMiddleware | Self::TxDropped => StatusCode::INTERNAL_SERVER_ERROR,
48 }
49 }
50
51 fn error_response(&self) -> HttpResponse {
52 match self {
53 Self::MissingMiddleware | Self::TxDropped => {
54 HttpResponse::InternalServerError().finish()
55 }
56 }
57 }
58}
59
60struct Uploaded<T> {
61 rx: Receiver<Value<T>>,
62}
63
64pub struct MultipartMiddleware<S, T, E> {
65 form: Form<T, E>,
66 service: S,
67}
68
69impl<T> FromRequest for Value<T>
70where
71 T: 'static,
72{
73 type Error = FromRequestError;
74 type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
75
76 fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
77 let opt = req.extensions_mut().remove::<Uploaded<T>>();
78 Box::pin(async move {
79 let fut = opt.ok_or(FromRequestError::MissingMiddleware)?;
80
81 fut.rx.await.map_err(|_| FromRequestError::TxDropped)
82 })
83 }
84}
85
86impl<S, T, E> Transform<S, ServiceRequest> for Form<T, E>
87where
88 S: Service<ServiceRequest, Error = actix_web::Error>,
89 S::Future: 'static,
90 T: 'static,
91 E: ResponseError + 'static,
92{
93 type Response = S::Response;
94 type Error = S::Error;
95 type InitError = ();
96 type Transform = MultipartMiddleware<S, T, E>;
97 type Future = Ready<Result<Self::Transform, Self::InitError>>;
98
99 fn new_transform(&self, service: S) -> Self::Future {
100 ready(Ok(MultipartMiddleware {
101 form: self.clone(),
102 service,
103 }))
104 }
105}
106
107impl<S, T, E> Service<ServiceRequest> for MultipartMiddleware<S, T, E>
108where
109 S: Service<ServiceRequest, Error = actix_web::Error>,
110 S::Future: 'static,
111 T: 'static,
112 E: ResponseError + 'static,
113{
114 type Response = S::Response;
115 type Error = S::Error;
116 type Future = LocalBoxFuture<'static, Result<S::Response, S::Error>>;
117
118 fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 self.service.poll_ready(cx)
120 }
121
122 fn call(&self, mut req: ServiceRequest) -> Self::Future {
123 let (tx, rx) = channel();
124 req.extensions_mut().insert(Uploaded { rx });
125 let payload = req.take_payload();
126 let multipart = actix_multipart::Multipart::new(req.headers(), payload);
127 let form = self.form.clone();
128 let fut = self.service.call(req);
129
130 Box::pin(async move {
131 let uploaded = match handle_multipart(multipart, form.clone()).await {
132 Ok(Ok(uploaded)) => uploaded,
133 Ok(Err(e)) => return Err(e.into()),
134 Err(e) => {
135 if let Some(f) = form.transform_error.clone() {
136 return Err((f)(e));
137 } else {
138 return Err(e.into());
139 }
140 }
141 };
142 let _ = tx.send(uploaded);
143 fut.await
144 })
145 }
146}