Skip to main content

bolt_web/
request.rs

1use bytes::Bytes;
2use futures_util::TryStreamExt;
3use http_body_util::{BodyExt, BodyStream};
4use hyper::header::HeaderName;
5use hyper::{Request, Uri, Version, body::Incoming, header::HeaderValue};
6use mime::Mime;
7use multer::Multipart;
8use serde::de::DeserializeOwned;
9use std::collections::HashMap;
10use std::net::SocketAddr;
11use tokio::io::AsyncWriteExt;
12use url::form_urlencoded;
13use uuid::Uuid;
14
15use crate::types::{BoltError, FormData, FormFile};
16
17#[allow(dead_code)]
18pub struct RequestBody {
19    pub inner: Option<Request<Incoming>>,
20    pub raw_body: Option<Bytes>,
21    params: HashMap<String, String>,
22    form_data_result: Option<Result<FormData, Box<dyn std::error::Error + Send + Sync>>>,
23    temp_paths: Vec<String>,
24    socket: SocketAddr,
25    pub extended: bool,
26}
27
28#[allow(dead_code)]
29impl RequestBody {
30    pub fn new(req: Request<Incoming>, socket: SocketAddr) -> Self {
31        Self {
32            inner: Some(req),
33            params: HashMap::new(),
34            form_data_result: None,
35            temp_paths: Vec::new(),
36            socket,
37            extended: false,
38            raw_body: None,
39        }
40    }
41
42    pub fn params(&self) -> &HashMap<String, String> {
43        &self.params
44    }
45
46    pub fn remote_addr(&self) -> &SocketAddr {
47        &self.socket
48    }
49
50    pub fn param(&self, key: &str) -> String {
51        self.params.get(key).cloned().unwrap_or_default()
52    }
53
54    pub(crate) fn set_params(&mut self, params: HashMap<String, String>) {
55        self.params = params;
56    }
57
58    pub fn method(&self) -> &hyper::Method {
59        self.inner
60            .as_ref()
61            .expect("Cannot access method, request body was consumed.")
62            .method()
63    }
64
65    pub fn path(&self) -> &str {
66        self.inner
67            .as_ref()
68            .expect("Cannot access path, request body was consumed.")
69            .uri()
70            .path()
71    }
72
73    pub fn headers(&self) -> &hyper::HeaderMap {
74        self.inner
75            .as_ref()
76            .expect("Cannot access headers, request body was consumed.")
77            .headers()
78    }
79
80    pub fn set_headers(&mut self, key: &str, value: &str) {
81        let key = HeaderName::from_bytes(key.as_bytes()).expect("Invalid header name");
82        let value = HeaderValue::from_str(value).expect("Invalid header value");
83
84        self.inner
85            .as_mut()
86            .expect("Cannot set headers, request body was consumed.")
87            .headers_mut()
88            .insert(key, value);
89    }
90
91    pub fn get_headers(&mut self, key: &str) -> Option<&HeaderValue> {
92        self.inner
93            .as_ref()
94            .expect("Cannot access headers, request body was consumed.")
95            .headers()
96            .get(key)
97    }
98
99    pub fn uri(&self) -> &Uri {
100        self.inner
101            .as_ref()
102            .expect("Cannot access uri, request body was consumed.")
103            .uri()
104    }
105
106    pub fn version(&self) -> Version {
107        self.inner
108            .as_ref()
109            .expect("Cannot access version, request body was consumed.")
110            .version()
111    }
112
113    pub fn query(&self) -> HashMap<String, String> {
114        self.inner
115            .as_ref()
116            .expect("Cannot access uri, request body was consumed.")
117            .uri()
118            .query()
119            .map(|q| {
120                form_urlencoded::parse(q.as_bytes())
121                    .into_owned()
122                    .collect::<HashMap<String, String>>()
123            })
124            .unwrap_or_default()
125    }
126
127    pub fn query_param(&self, key: &str) -> Option<String> {
128        let query_params = self.query();
129        query_params.get(key).cloned()
130    }
131
132    pub async fn bytes(&mut self) -> Result<Bytes, hyper::Error> {
133        if let Some(raw) = &self.raw_body {
134            return Ok(raw.clone());
135        }
136
137        let req: Request<Incoming> = self
138            .inner
139            .take()
140            .expect("Request body has already been consumed.");
141
142        let (_, body) = req.into_parts();
143
144        let collected = body.collect().await?;
145        Ok(collected.to_bytes())
146    }
147
148    pub async fn text(&mut self) -> Result<String, BoltError> {
149        let bytes = self.bytes().await?;
150        let text = String::from_utf8(bytes.to_vec())?;
151        Ok(text)
152    }
153
154    pub async fn json<T: DeserializeOwned>(&mut self) -> Result<T, BoltError> {
155        let bytes = self.bytes().await?;
156        Ok(serde_json::from_slice(&bytes)?)
157    }
158
159    pub async fn urlencoded(&mut self) -> Result<serde_json::Value, BoltError> {
160        let bytes = self.bytes().await?;
161
162        if self.extended {
163            let structured: serde_json::Value = serde_urlencoded::from_bytes(&bytes)?;
164            Ok(structured)
165        } else {
166            let s = String::from_utf8(bytes.to_vec())?;
167            let mut map = HashMap::new();
168            for (k, v) in form_urlencoded::parse(s.as_bytes()) {
169                map.insert(k.into_owned(), v.into_owned());
170            }
171            Ok(serde_json::json!(map))
172        }
173    }
174
175    pub fn get_cookie(&self, name: &str) -> Option<String> {
176        self.inner
177            .as_ref()
178            .expect("Request body has already been consumed")
179            .headers()
180            .get(hyper::header::COOKIE)?
181            .to_str()
182            .ok()
183            .and_then(|cookie_header| {
184                cookie_header.split(';').map(|s| s.trim()).find_map(|pair| {
185                    let mut parts = pair.splitn(2, '=');
186                    let key = parts.next()?;
187                    let value = parts.next()?;
188                    if key == name {
189                        Some(value.to_string())
190                    } else {
191                        None
192                    }
193                })
194            })
195    }
196
197    pub async fn form_data(&mut self) -> Result<FormData, BoltError> {
198        if let Some(Ok(fd)) = &self.form_data_result {
199            return Ok(fd.clone());
200        }
201        if let Some(Err(e)) = &self.form_data_result {
202            return Err(Box::new(std::io::Error::new(
203                std::io::ErrorKind::Other,
204                e.to_string(),
205            )));
206        }
207
208        let header_opt = {
209            let req_ref = self
210                .inner
211                .as_ref()
212                .expect("Request body was consumed before form_data call.");
213            req_ref.headers().get(hyper::header::CONTENT_TYPE).cloned()
214        };
215
216        let content_type = match header_opt {
217            Some(header_value) => header_value.to_str()?.parse::<Mime>()?,
218            None => {
219                let err: BoltError = "Missing Content-Type header".into();
220                self.form_data_result = Some(Err(err));
221                return Err("Missing Content-Type header".into());
222            }
223        };
224
225        if content_type.type_() != mime::MULTIPART || content_type.subtype() != mime::FORM_DATA {
226            let err: BoltError = "Content-Type is not multipart/form-data".into();
227            self.form_data_result = Some(Err(err));
228            return Err("Content-Type is not multipart/form-data".into());
229        }
230
231        let boundary = content_type
232            .get_param(mime::BOUNDARY)
233            .ok_or("Missing boundary parameter in Content-Type")?
234            .to_string();
235
236        let (_, body) = self
237            .inner
238            .take()
239            .expect("Request already consumed")
240            .into_parts();
241
242        let stream =
243            BodyStream::new(body).try_filter_map(|frame| async move { Ok(frame.into_data().ok()) });
244
245        let mut multipart = Multipart::new(stream, boundary);
246
247        let mut form_data = FormData {
248            files: Vec::new(),
249            fields: HashMap::new(),
250        };
251
252        while let Ok(Some(mut field)) = multipart.next_field().await {
253            let name = field.name().unwrap_or_default().to_string();
254
255            if let Some(file_name) = field.file_name() {
256                let filename = file_name.to_string();
257                let unique_id = Uuid::new_v4();
258                let temp_path =
259                    std::env::temp_dir().join(format!("bolt_upload_{}_{}", unique_id, filename));
260
261                let mut dest = tokio::fs::File::create(&temp_path).await?;
262
263                while let Some(chunk) = field.chunk().await? {
264                    dest.write_all(&chunk).await?;
265                }
266
267                self.temp_paths.push(temp_path.display().to_string());
268
269                form_data.files.push(FormFile {
270                    field_name: name,
271                    file_name: filename,
272                    content_type: field
273                        .content_type()
274                        .map(|m| m.essence_str().to_string())
275                        .unwrap_or_default(),
276                    temp_path: temp_path.display().to_string(),
277                });
278            } else {
279                form_data.fields.insert(name, field.text().await?);
280            }
281        }
282
283        self.form_data_result = Some(Ok(form_data.clone()));
284        Ok(form_data)
285    }
286
287    pub async fn files(&mut self) -> Result<Vec<FormFile>, BoltError> {
288        let form_data = self.form_data().await?;
289        Ok(form_data.files)
290    }
291
292    pub async fn file(&mut self, name: &str) -> Result<Option<FormFile>, BoltError> {
293        let files = self.files().await?;
294
295        let file = files.iter().find(|f| f.field_name == name);
296        Ok(file.cloned())
297    }
298
299    pub async fn cleanup(&mut self) {
300        for path in self.temp_paths.drain(..) {
301            let _ = tokio::fs::remove_file(&path).await;
302        }
303    }
304}
305
306impl Drop for RequestBody {
307    fn drop(&mut self) {
308        if self.temp_paths.is_empty() {
309            return;
310        }
311
312        let paths = std::mem::take(&mut self.temp_paths);
313
314        if tokio::runtime::Handle::try_current().is_ok() {
315            tokio::spawn(async move {
316                for path in paths {
317                    let _ = tokio::fs::remove_file(&path).await;
318                }
319            });
320        } else {
321            for path in paths {
322                let _ = std::fs::remove_file(&path);
323            }
324        }
325    }
326}