asknothingx2_util/api/request/
body.rs

1use std::{fmt, pin::Pin};
2
3use bytes::Bytes;
4use reqwest::{multipart, Body, RequestBuilder};
5
6use super::error::StreamError;
7
8pub enum RequestBody {
9    Static(&'static str),
10    String(String),
11    Bytes(Bytes),
12    Vec(Vec<u8>),
13    Form(Vec<(String, String)>),
14    Multipart(reqwest::multipart::Form),
15    Json(serde_json::Value),
16    Custom(Body),
17    Empty,
18
19    #[cfg(feature = "stream")]
20    File(tokio::fs::File),
21    #[cfg(feature = "stream")]
22    BufferedFile {
23        file: tokio::fs::File,
24        buffer_size: usize,
25    },
26    #[cfg(feature = "stream")]
27    AsyncRead(Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>),
28    #[cfg(feature = "stream")]
29    TcpStream(tokio::net::TcpStream),
30    #[cfg(feature = "stream")]
31    ProcessOutput {
32        command: String,
33        child: tokio::process::Child,
34        stdout: Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>,
35    },
36
37    #[cfg(feature = "stream")]
38    Stream(Pin<Box<dyn futures_util::Stream<Item = Result<Bytes, StreamError>> + Send + Sync>>),
39    #[cfg(feature = "stream")]
40    IoStream(
41        Pin<Box<dyn futures_util::Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync>>,
42    ),
43    #[cfg(feature = "stream")]
44    BytesIterator(Vec<Bytes>),
45    #[cfg(feature = "stream")]
46    CodecReader {
47        reader: Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>,
48        codec_type: CodecType,
49    },
50}
51
52impl RequestBody {
53    pub const fn from_static(s: &'static str) -> Self {
54        Self::Static(s)
55    }
56
57    pub fn from_string(s: impl Into<String>) -> Self {
58        Self::String(s.into())
59    }
60
61    pub fn from_bytes(bytes: Bytes) -> Self {
62        Self::Bytes(bytes)
63    }
64
65    pub fn from_vec(vec: Vec<u8>) -> Self {
66        Self::Vec(vec)
67    }
68
69    pub fn from_json(value: serde_json::Value) -> Self {
70        Self::Json(value)
71    }
72
73    pub fn from_json_serializable<T: serde::Serialize>(
74        value: &T,
75    ) -> Result<Self, serde_json::Error> {
76        let json_value = serde_json::to_value(value)?;
77        Ok(Self::Json(json_value))
78    }
79
80    pub fn from_form(form: Vec<(String, String)>) -> Self {
81        Self::Form(form)
82    }
83
84    pub fn from_form_pairs<I, K, V>(pairs: I) -> Self
85    where
86        I: IntoIterator<Item = (K, V)>,
87        K: Into<String>,
88        V: Into<String>,
89    {
90        let form: Vec<(String, String)> = pairs
91            .into_iter()
92            .map(|(k, v)| (k.into(), v.into()))
93            .collect();
94        Self::Form(form)
95    }
96
97    pub fn from_multipart(form: multipart::Form) -> Self {
98        Self::Multipart(form)
99    }
100
101    pub const fn empty() -> Self {
102        Self::Empty
103    }
104
105    pub fn from_reqwest_body(body: Body) -> Self {
106        Self::Custom(body)
107    }
108
109    #[cfg(feature = "stream")]
110    pub fn from_file(file: tokio::fs::File) -> Self {
111        Self::File(file)
112    }
113
114    #[cfg(feature = "stream")]
115    pub fn from_file_buffered(file: tokio::fs::File, buffer_size: usize) -> Self {
116        Self::BufferedFile { file, buffer_size }
117    }
118
119    #[cfg(feature = "stream")]
120    pub fn from_async_read<R>(reader: R) -> Self
121    where
122        R: tokio::io::AsyncRead + Send + Sync + 'static,
123    {
124        Self::AsyncRead(Box::pin(reader))
125    }
126
127    #[cfg(feature = "stream")]
128    pub async fn from_file_path<P: AsRef<std::path::Path>>(path: P) -> Result<Self, StreamError> {
129        use tokio::fs::File;
130
131        use super::error::FileOperation;
132
133        let path_ref = path.as_ref();
134        let file = File::open(path_ref)
135            .await
136            .map_err(|e| StreamError::file_error(path_ref, FileOperation::Open, e))?;
137        Ok(Self::from_file(file))
138    }
139
140    #[cfg(feature = "stream")]
141    pub async fn from_file_path_buffered<P: AsRef<std::path::Path>>(
142        path: P,
143        buffer_size: usize,
144    ) -> Result<Self, StreamError> {
145        use tokio::fs::File;
146
147        use super::error::FileOperation;
148
149        let path_ref = path.as_ref();
150        let file = File::open(path_ref)
151            .await
152            .map_err(|e| StreamError::file_error(path_ref, FileOperation::Open, e))?;
153        Ok(Self::from_file_buffered(file, buffer_size))
154    }
155
156    #[cfg(feature = "stream")]
157    pub fn from_tcp_stream(tcp: tokio::net::TcpStream) -> Self {
158        Self::TcpStream(tcp)
159    }
160
161    #[cfg(feature = "stream")]
162    pub fn from_command_output(mut command: tokio::process::Command) -> Result<Self, StreamError> {
163        use std::{io, process::Stdio};
164
165        use super::error::ProcessOperation;
166
167        let command_str = format!("{command:?}");
168        let mut child =
169            command
170                .stdout(Stdio::piped())
171                .spawn()
172                .map_err(|e| StreamError::Process {
173                    command: command_str.clone(),
174                    operation: ProcessOperation::Spawn,
175                    source: e,
176                })?;
177
178        let stdout = child.stdout.take().ok_or_else(|| StreamError::Process {
179            command: command_str.clone(),
180            operation: ProcessOperation::ReadStdout,
181            source: io::Error::other("Failed to capture stdout"),
182        })?;
183
184        Ok(Self::ProcessOutput {
185            command: command_str,
186            child,
187            stdout: Box::pin(stdout),
188        })
189    }
190
191    #[cfg(feature = "stream")]
192    pub fn from_stream<S>(stream: S) -> Self
193    where
194        S: futures_util::Stream<Item = Result<Bytes, StreamError>> + Send + Sync + 'static,
195    {
196        Self::Stream(Box::pin(stream))
197    }
198
199    #[cfg(feature = "stream")]
200    pub fn from_io_stream<S>(stream: S) -> Self
201    where
202        S: futures_util::Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
203    {
204        Self::IoStream(Box::pin(stream))
205    }
206
207    #[cfg(feature = "stream")]
208    pub fn from_bytes_iter<I>(iter: I) -> Self
209    where
210        I: IntoIterator<Item = Bytes>,
211    {
212        Self::BytesIterator(iter.into_iter().collect())
213    }
214
215    #[cfg(feature = "stream")]
216    pub fn from_framed_read<R>(reader: R, codec_type: CodecType) -> Self
217    where
218        R: tokio::io::AsyncRead + Send + Sync + 'static,
219    {
220        Self::CodecReader {
221            reader: Box::pin(reader),
222            codec_type,
223        }
224    }
225
226    #[cfg(feature = "stream")]
227    pub fn from_bytes_framed<R>(reader: R) -> Self
228    where
229        R: tokio::io::AsyncRead + Send + Sync + 'static,
230    {
231        Self::from_framed_read(reader, CodecType::Bytes)
232    }
233
234    pub fn is_empty(&self) -> bool {
235        match self {
236            Self::Empty => true,
237            Self::Static(s) => s.is_empty(),
238            Self::String(s) => s.is_empty(),
239            Self::Bytes(b) => b.is_empty(),
240            Self::Vec(v) => v.is_empty(),
241            Self::Form(f) => f.is_empty(),
242            Self::Json(j) => j.is_null(),
243            #[cfg(feature = "stream")]
244            Self::BytesIterator(bytes) => bytes.is_empty(),
245            _ => false,
246        }
247    }
248
249    pub fn content_length(&self) -> Option<u64> {
250        match self {
251            Self::Empty => Some(0),
252            Self::Static(s) => Some(s.len() as u64),
253            Self::String(s) => Some(s.len() as u64),
254            Self::Bytes(b) => Some(b.len() as u64),
255            Self::Vec(v) => Some(v.len() as u64),
256            Self::Json(j) => serde_json::to_string(j).ok().map(|s| s.len() as u64),
257            Self::Form(f) => {
258                let encoded = serde_urlencoded::to_string(f).ok()?;
259                Some(encoded.len() as u64)
260            }
261            #[cfg(feature = "stream")]
262            Self::BytesIterator(bytes) => Some(bytes.iter().map(|b| b.len() as u64).sum()),
263            _ => None,
264        }
265    }
266
267    pub fn into_reqwest_body(self, client: RequestBuilder) -> RequestBuilder {
268        match self {
269            Self::Static(s) => client.body(s),
270            Self::String(s) => client.body(s),
271            Self::Bytes(b) => client.body(b),
272            Self::Vec(v) => client.body(v),
273            Self::Json(j) => {
274                let json_string = serde_json::to_string(&j).expect("Failed to serialize JSON");
275                client.body(json_string)
276            }
277            Self::Form(f) => {
278                let form_string =
279                    serde_urlencoded::to_string(&f).expect("Failed to serialize form data");
280                client.body(form_string)
281            }
282            Self::Multipart(m) => client.multipart(m),
283            Self::Custom(b) => client.body(b),
284            Self::Empty => client.body(""),
285
286            #[cfg(feature = "stream")]
287            Self::File(file) => {
288                use futures_util::TryStreamExt;
289                use tokio_util::io::ReaderStream;
290
291                let stream = ReaderStream::new(file);
292                let stream = stream.map_err(|e| StreamError::io_error("file_read", e));
293                client.body(Body::wrap_stream(stream))
294            }
295
296            #[cfg(feature = "stream")]
297            Self::BufferedFile { file, buffer_size } => {
298                use futures_util::TryStreamExt;
299                use tokio::io::BufReader;
300                use tokio_util::io::ReaderStream;
301
302                let buffered_reader = BufReader::with_capacity(buffer_size, file);
303                let stream = ReaderStream::new(buffered_reader);
304                let stream = stream.map_err(|e| StreamError::io_error("buffered_file_read", e));
305                client.body(Body::wrap_stream(stream))
306            }
307
308            #[cfg(feature = "stream")]
309            Self::AsyncRead(reader) => {
310                use futures_util::TryStreamExt;
311                use tokio_util::io::ReaderStream;
312
313                let stream = ReaderStream::new(reader);
314                let stream = stream.map_err(|e| StreamError::io_error("async_read", e));
315                client.body(Body::wrap_stream(stream))
316            }
317
318            #[cfg(feature = "stream")]
319            Self::TcpStream(tcp) => {
320                use crate::api::request::error::NetworkOperation;
321                use futures_util::TryStreamExt;
322                use tokio_util::io::ReaderStream;
323
324                let stream = ReaderStream::new(tcp);
325                let stream = stream.map_err(|e| {
326                    StreamError::network_error("tcp_stream", NetworkOperation::Read, e)
327                });
328                client.body(Body::wrap_stream(stream))
329            }
330
331            #[cfg(feature = "stream")]
332            Self::ProcessOutput { stdout, .. } => {
333                use futures_util::TryStreamExt;
334                use tokio_util::io::ReaderStream;
335
336                let stream = ReaderStream::new(stdout);
337                let stream = stream.map_err(|e| StreamError::io_error("command_output", e));
338                client.body(Body::wrap_stream(stream))
339            }
340
341            #[cfg(feature = "stream")]
342            Self::Stream(s) => client.body(Body::wrap_stream(s)),
343            #[cfg(feature = "stream")]
344            Self::IoStream(stream) => {
345                use futures_util::TryStreamExt;
346
347                let stream = stream.map_err(StreamError::from);
348                client.body(Body::wrap_stream(stream))
349            }
350
351            #[cfg(feature = "stream")]
352            Self::BytesIterator(bytes) => {
353                use futures_util::stream;
354
355                let iter_stream = stream::iter(bytes.into_iter().map(Ok::<Bytes, StreamError>));
356                client.body(Body::wrap_stream(iter_stream))
357            }
358
359            #[cfg(feature = "stream")]
360            Self::CodecReader { reader, codec_type } => {
361                use futures_util::{StreamExt, TryStreamExt};
362                use std::io;
363                use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec, LinesCodecError};
364
365                match codec_type {
366                    CodecType::Bytes => {
367                        let framed = FramedRead::new(reader, BytesCodec::new());
368                        let stream = framed.map_err(|e| StreamError::io_error("bytes_codec", e));
369                        client.body(Body::wrap_stream(stream))
370                    }
371                    CodecType::Lines => {
372                        let framed = FramedRead::new(reader, LinesCodec::new());
373                        let stream = framed.map(|result| match result {
374                            Ok(line) => Ok(Bytes::from(line + "\n")),
375                            Err(lines_error) => {
376                                let io_error = match lines_error {
377                                    LinesCodecError::MaxLineLengthExceeded => {
378                                        io::Error::new(io::ErrorKind::InvalidData, "Line too long")
379                                    }
380                                    LinesCodecError::Io(e) => e,
381                                };
382                                Err(StreamError::io_error("lines_codec", io_error))
383                            }
384                        });
385                        client.body(Body::wrap_stream(stream))
386                    }
387                    CodecType::Json => {
388                        let framed = FramedRead::new(reader, LinesCodec::new());
389                        let stream = framed.map(|result| {
390                            result.map(|line| Bytes::from(line + "\n")).map_err(|e| {
391                                StreamError::io_error("json_codec", io::Error::other(e.to_string()))
392                            })
393                        });
394                        client.body(Body::wrap_stream(stream))
395                    }
396                    CodecType::Custom(name) => {
397                        let framed = FramedRead::new(reader, BytesCodec::new());
398                        let stream = framed.map_err(move |e| {
399                            StreamError::io_error(format!("custom_codec_{name}"), e)
400                        });
401                        client.body(Body::wrap_stream(stream))
402                    }
403                }
404            }
405        }
406    }
407}
408
409impl fmt::Display for RequestBody {
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        match self {
412            Self::Empty { .. } => write!(f, "empty body"),
413
414            Self::Static(s) => write!(f, "static ({} bytes)", s.len()),
415            Self::String(s) => write!(f, "string ({} bytes)", s.len()),
416            Self::Bytes(b) => write!(f, "bytes ({} bytes)", b.len()),
417            Self::Vec(v) => write!(f, "binary ({} bytes)", v.len()),
418            Self::Json(_) => write!(f, "JSON"),
419            Self::Form(data) => write!(f, "form ({} fields)", data.len()),
420            Self::Multipart(_) => write!(f, "multipart"),
421            Self::Custom(_) => write!(f, "custom"),
422
423            #[cfg(feature = "stream")]
424            Self::File { .. } => write!(f, "file stream"),
425            #[cfg(feature = "stream")]
426            Self::BufferedFile { buffer_size, .. } => write!(f, "buffered file ({buffer_size}B)"),
427            #[cfg(feature = "stream")]
428            Self::AsyncRead(..) => write!(f, "async reader"),
429            #[cfg(feature = "stream")]
430            Self::TcpStream(_) => write!(f, "TCP stream"),
431            #[cfg(feature = "stream")]
432            Self::ProcessOutput { command, .. } => {
433                write!(f, "process output: {}", truncate_command(command))
434            }
435            #[cfg(feature = "stream")]
436            Self::Stream { .. } => write!(f, "stream"),
437            #[cfg(feature = "stream")]
438            Self::IoStream { .. } => write!(f, "I/O stream"),
439            #[cfg(feature = "stream")]
440            Self::BytesIterator(chunks) => {
441                let total: usize = chunks.iter().map(|b| b.len()).sum();
442                write!(
443                    f,
444                    "bytes iterator ({} chunks, {} bytes)",
445                    chunks.len(),
446                    total
447                )
448            }
449            #[cfg(feature = "stream")]
450            Self::CodecReader { codec_type, .. } => write!(f, "{codec_type} codec"),
451        }
452    }
453}
454
455impl fmt::Debug for RequestBody {
456    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457        match self {
458            Self::Static(s) => f
459                .debug_tuple("Static")
460                .field(&format!("{}B: {}", s.len(), safe_debug_preview(s)))
461                .finish(),
462
463            Self::String(s) => f
464                .debug_tuple("String")
465                .field(&format!("{}B/{} cap", s.len(), s.capacity()))
466                .finish(),
467
468            Self::Bytes(b) => f
469                .debug_tuple("Bytes")
470                .field(&format!("{}B", b.len()))
471                .finish(),
472
473            Self::Vec(v) => f
474                .debug_tuple("Vec")
475                .field(&format!("{}B/{} cap", v.len(), v.capacity()))
476                .finish(),
477            Self::Json(j) => {
478                let preview = serde_json::to_string(j)
479                    .map(|s| safe_debug_preview(&s))
480                    .unwrap_or_else(|_| "[invalid]".to_string());
481                f.debug_tuple("Json").field(&preview).finish()
482            }
483
484            Self::Form(data) => {
485                let safe_form: Vec<_> = data
486                    .iter()
487                    .take(3)
488                    .map(|(k, v)| {
489                        (if is_sensitive_key(k) {
490                            (k.as_str(), "[REDACTED]")
491                        } else {
492                            (k.as_str(), truncate_str(v, 20))
493                        },)
494                    })
495                    .collect();
496
497                if data.len() > 3 {
498                    f.debug_tuple("Form")
499                        .field(&format!("{:?}... ({} total)", safe_form, data.len()))
500                        .finish()
501                } else {
502                    f.debug_tuple("Form").field(&safe_form).finish()
503                }
504            }
505            Self::Multipart(_) => f.debug_tuple("Multipart").field(&"[multipart]").finish(),
506            Self::Custom(_) => f.debug_tuple("Custom").field(&"[reqwest::Body]").finish(),
507            Self::Empty => f.debug_tuple("Empty").finish(),
508
509            #[cfg(feature = "stream")]
510            Self::File(_) => f.debug_tuple("File").field(&"[file]").finish(),
511            #[cfg(feature = "stream")]
512            Self::BufferedFile {
513                file: _,
514                buffer_size,
515            } => f
516                .debug_tuple("BufferedFile")
517                .field(&format!("{buffer_size}B buffer"))
518                .finish(),
519            #[cfg(feature = "stream")]
520            Self::AsyncRead(_) => f.debug_tuple("AsyncRead").field(&"[reader]").finish(),
521            #[cfg(feature = "stream")]
522            Self::TcpStream(_) => f.debug_tuple("TcpStream").field(&"[tcp]").finish(),
523            #[cfg(feature = "stream")]
524            Self::ProcessOutput { command, .. } => f
525                .debug_tuple("ProcessOutput")
526                .field(&truncate_str(command, 40))
527                .finish(),
528            #[cfg(feature = "stream")]
529            Self::Stream(_) => f.debug_tuple("Stream").field(&"[stream]").finish(),
530            #[cfg(feature = "stream")]
531            Self::IoStream(_) => f.debug_tuple("IoStream").field(&"[io_stream]").finish(),
532            #[cfg(feature = "stream")]
533            Self::BytesIterator(chunks) => {
534                let total: usize = chunks.iter().map(|b| b.len()).sum();
535                f.debug_tuple("BytesIterator")
536                    .field(&format!("{total} chunks, {total}B"))
537                    .finish()
538            }
539            #[cfg(feature = "stream")]
540            Self::CodecReader { codec_type, .. } => {
541                f.debug_tuple("CodecReader").field(codec_type).finish()
542            }
543        }
544    }
545}
546
547#[cfg(feature = "stream")]
548#[derive(Debug, Clone, PartialEq, Eq)]
549pub enum CodecType {
550    Bytes,
551    Lines,
552    Json,
553    Custom(String),
554}
555
556#[cfg(feature = "stream")]
557impl fmt::Display for CodecType {
558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559        match self {
560            CodecType::Bytes => write!(f, "bytes"),
561            CodecType::Lines => write!(f, "lines"),
562            CodecType::Json => write!(f, "json"),
563            CodecType::Custom(name) => write!(f, "{name}"),
564        }
565    }
566}
567
568fn safe_debug_preview(s: &str) -> String {
569    if s.len() <= 100 {
570        format!("{s:?}")
571    } else {
572        format!("{:?}... (+{} more chars)", &s[..100], s.len() - 100)
573    }
574}
575
576fn truncate_string(s: &str, max_len: usize) -> String {
577    if s.len() <= max_len {
578        s.to_string()
579    } else {
580        format!("{}...", &s[..max_len])
581    }
582}
583
584fn truncate_str(s: &str, max_len: usize) -> &str {
585    if s.len() <= max_len {
586        s
587    } else {
588        &s[..max_len]
589    }
590}
591
592fn truncate_command(cmd: &str) -> String {
593    truncate_string(cmd, 80)
594}
595
596fn is_sensitive_key(key: &str) -> bool {
597    let key_lower = key.to_lowercase();
598    key_lower.contains("password")
599        || key_lower.contains("token")
600        || key_lower.contains("secret")
601        || key_lower.contains("auth")
602}