1use bytes::Bytes;
2use futures_util::TryStreamExt;
3use http_body_util::{BodyExt, BodyStream};
4use hyper::header::HeaderName;
5use hyper::{Request, Uri, Version, body::Incoming, header::HeaderValue};
6use mime::Mime;
7use multer::Multipart;
8use serde::de::DeserializeOwned;
9use std::collections::HashMap;
10use std::net::SocketAddr;
11use tokio::io::AsyncWriteExt;
12use url::form_urlencoded;
13use uuid::Uuid;
14
15use crate::types::{BoltError, FormData, FormFile};
16
17#[allow(dead_code)]
18pub struct RequestBody {
19 pub inner: Option<Request<Incoming>>,
20 pub raw_body: Option<Bytes>,
21 params: HashMap<String, String>,
22 form_data_result: Option<Result<FormData, Box<dyn std::error::Error + Send + Sync>>>,
23 temp_paths: Vec<String>,
24 socket: SocketAddr,
25 pub extended: bool,
26}
27
28#[allow(dead_code)]
29impl RequestBody {
30 pub fn new(req: Request<Incoming>, socket: SocketAddr) -> Self {
31 Self {
32 inner: Some(req),
33 params: HashMap::new(),
34 form_data_result: None,
35 temp_paths: Vec::new(),
36 socket,
37 extended: false,
38 raw_body: None,
39 }
40 }
41
42 pub fn params(&self) -> &HashMap<String, String> {
43 &self.params
44 }
45
46 pub fn remote_addr(&self) -> &SocketAddr {
47 &self.socket
48 }
49
50 pub fn param(&self, key: &str) -> String {
51 self.params.get(key).cloned().unwrap_or_default()
52 }
53
54 pub(crate) fn set_params(&mut self, params: HashMap<String, String>) {
55 self.params = params;
56 }
57
58 pub fn method(&self) -> &hyper::Method {
59 self.inner
60 .as_ref()
61 .expect("Cannot access method, request body was consumed.")
62 .method()
63 }
64
65 pub fn path(&self) -> &str {
66 self.inner
67 .as_ref()
68 .expect("Cannot access path, request body was consumed.")
69 .uri()
70 .path()
71 }
72
73 pub fn headers(&self) -> &hyper::HeaderMap {
74 self.inner
75 .as_ref()
76 .expect("Cannot access headers, request body was consumed.")
77 .headers()
78 }
79
80 pub fn set_headers(&mut self, key: &str, value: &str) {
81 let key = HeaderName::from_bytes(key.as_bytes()).expect("Invalid header name");
82 let value = HeaderValue::from_str(value).expect("Invalid header value");
83
84 self.inner
85 .as_mut()
86 .expect("Cannot set headers, request body was consumed.")
87 .headers_mut()
88 .insert(key, value);
89 }
90
91 pub fn get_headers(&mut self, key: &str) -> Option<&HeaderValue> {
92 self.inner
93 .as_ref()
94 .expect("Cannot access headers, request body was consumed.")
95 .headers()
96 .get(key)
97 }
98
99 pub fn uri(&self) -> &Uri {
100 self.inner
101 .as_ref()
102 .expect("Cannot access uri, request body was consumed.")
103 .uri()
104 }
105
106 pub fn version(&self) -> Version {
107 self.inner
108 .as_ref()
109 .expect("Cannot access version, request body was consumed.")
110 .version()
111 }
112
113 pub fn query(&self) -> HashMap<String, String> {
114 self.inner
115 .as_ref()
116 .expect("Cannot access uri, request body was consumed.")
117 .uri()
118 .query()
119 .map(|q| {
120 form_urlencoded::parse(q.as_bytes())
121 .into_owned()
122 .collect::<HashMap<String, String>>()
123 })
124 .unwrap_or_default()
125 }
126
127 pub fn query_param(&self, key: &str) -> Option<String> {
128 let query_params = self.query();
129 query_params.get(key).cloned()
130 }
131
132 pub async fn bytes(&mut self) -> Result<Bytes, hyper::Error> {
133 if let Some(raw) = &self.raw_body {
134 return Ok(raw.clone());
135 }
136
137 let req: Request<Incoming> = self
138 .inner
139 .take()
140 .expect("Request body has already been consumed.");
141
142 let (_, body) = req.into_parts();
143
144 let collected = body.collect().await?;
145 Ok(collected.to_bytes())
146 }
147
148 pub async fn text(&mut self) -> Result<String, BoltError> {
149 let bytes = self.bytes().await?;
150 let text = String::from_utf8(bytes.to_vec())?;
151 Ok(text)
152 }
153
154 pub async fn json<T: DeserializeOwned>(&mut self) -> Result<T, BoltError> {
155 let bytes = self.bytes().await?;
156 Ok(serde_json::from_slice(&bytes)?)
157 }
158
159 pub async fn urlencoded(&mut self) -> Result<serde_json::Value, BoltError> {
160 let bytes = self.bytes().await?;
161
162 if self.extended {
163 let structured: serde_json::Value = serde_urlencoded::from_bytes(&bytes)?;
164 Ok(structured)
165 } else {
166 let s = String::from_utf8(bytes.to_vec())?;
167 let mut map = HashMap::new();
168 for (k, v) in form_urlencoded::parse(s.as_bytes()) {
169 map.insert(k.into_owned(), v.into_owned());
170 }
171 Ok(serde_json::json!(map))
172 }
173 }
174
175 pub fn get_cookie(&self, name: &str) -> Option<String> {
176 self.inner
177 .as_ref()
178 .expect("Request body has already been consumed")
179 .headers()
180 .get(hyper::header::COOKIE)?
181 .to_str()
182 .ok()
183 .and_then(|cookie_header| {
184 cookie_header.split(';').map(|s| s.trim()).find_map(|pair| {
185 let mut parts = pair.splitn(2, '=');
186 let key = parts.next()?;
187 let value = parts.next()?;
188 if key == name {
189 Some(value.to_string())
190 } else {
191 None
192 }
193 })
194 })
195 }
196
197 pub async fn form_data(&mut self) -> Result<FormData, BoltError> {
198 if let Some(Ok(fd)) = &self.form_data_result {
199 return Ok(fd.clone());
200 }
201 if let Some(Err(e)) = &self.form_data_result {
202 return Err(Box::new(std::io::Error::new(
203 std::io::ErrorKind::Other,
204 e.to_string(),
205 )));
206 }
207
208 let header_opt = {
209 let req_ref = self
210 .inner
211 .as_ref()
212 .expect("Request body was consumed before form_data call.");
213 req_ref.headers().get(hyper::header::CONTENT_TYPE).cloned()
214 };
215
216 let content_type = match header_opt {
217 Some(header_value) => header_value.to_str()?.parse::<Mime>()?,
218 None => {
219 let err: BoltError = "Missing Content-Type header".into();
220 self.form_data_result = Some(Err(err));
221 return Err("Missing Content-Type header".into());
222 }
223 };
224
225 if content_type.type_() != mime::MULTIPART || content_type.subtype() != mime::FORM_DATA {
226 let err: BoltError = "Content-Type is not multipart/form-data".into();
227 self.form_data_result = Some(Err(err));
228 return Err("Content-Type is not multipart/form-data".into());
229 }
230
231 let boundary = content_type
232 .get_param(mime::BOUNDARY)
233 .ok_or("Missing boundary parameter in Content-Type")?
234 .to_string();
235
236 let (_, body) = self
237 .inner
238 .take()
239 .expect("Request already consumed")
240 .into_parts();
241
242 let stream =
243 BodyStream::new(body).try_filter_map(|frame| async move { Ok(frame.into_data().ok()) });
244
245 let mut multipart = Multipart::new(stream, boundary);
246
247 let mut form_data = FormData {
248 files: Vec::new(),
249 fields: HashMap::new(),
250 };
251
252 while let Ok(Some(mut field)) = multipart.next_field().await {
253 let name = field.name().unwrap_or_default().to_string();
254
255 if let Some(file_name) = field.file_name() {
256 let filename = file_name.to_string();
257 let unique_id = Uuid::new_v4();
258 let temp_path =
259 std::env::temp_dir().join(format!("bolt_upload_{}_{}", unique_id, filename));
260
261 let mut dest = tokio::fs::File::create(&temp_path).await?;
262
263 while let Some(chunk) = field.chunk().await? {
264 dest.write_all(&chunk).await?;
265 }
266
267 self.temp_paths.push(temp_path.display().to_string());
268
269 form_data.files.push(FormFile {
270 field_name: name,
271 file_name: filename,
272 content_type: field
273 .content_type()
274 .map(|m| m.essence_str().to_string())
275 .unwrap_or_default(),
276 temp_path: temp_path.display().to_string(),
277 });
278 } else {
279 form_data.fields.insert(name, field.text().await?);
280 }
281 }
282
283 self.form_data_result = Some(Ok(form_data.clone()));
284 Ok(form_data)
285 }
286
287 pub async fn files(&mut self) -> Result<Vec<FormFile>, BoltError> {
288 let form_data = self.form_data().await?;
289 Ok(form_data.files)
290 }
291
292 pub async fn file(&mut self, name: &str) -> Result<Option<FormFile>, BoltError> {
293 let files = self.files().await?;
294
295 let file = files.iter().find(|f| f.field_name == name);
296 Ok(file.cloned())
297 }
298
299 pub async fn cleanup(&mut self) {
300 for path in self.temp_paths.drain(..) {
301 let _ = tokio::fs::remove_file(&path).await;
302 }
303 }
304}
305
306impl Drop for RequestBody {
307 fn drop(&mut self) {
308 if self.temp_paths.is_empty() {
309 return;
310 }
311
312 let paths = std::mem::take(&mut self.temp_paths);
313
314 if tokio::runtime::Handle::try_current().is_ok() {
315 tokio::spawn(async move {
316 for path in paths {
317 let _ = tokio::fs::remove_file(&path).await;
318 }
319 });
320 } else {
321 for path in paths {
322 let _ = std::fs::remove_file(&path);
323 }
324 }
325 }
326}