1use std::{fmt, pin::Pin};
2
3use bytes::Bytes;
4use reqwest::{multipart, Body, RequestBuilder};
5
6use super::error::StreamError;
7
8pub enum RequestBody {
9 Static(&'static str),
10 String(String),
11 Bytes(Bytes),
12 Vec(Vec<u8>),
13 Form(Vec<(String, String)>),
14 Multipart(reqwest::multipart::Form),
15 Json(serde_json::Value),
16 Custom(Body),
17 Empty,
18
19 #[cfg(feature = "stream")]
20 File(tokio::fs::File),
21 #[cfg(feature = "stream")]
22 BufferedFile {
23 file: tokio::fs::File,
24 buffer_size: usize,
25 },
26 #[cfg(feature = "stream")]
27 AsyncRead(Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>),
28 #[cfg(feature = "stream")]
29 TcpStream(tokio::net::TcpStream),
30 #[cfg(feature = "stream")]
31 ProcessOutput {
32 command: String,
33 child: tokio::process::Child,
34 stdout: Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>,
35 },
36
37 #[cfg(feature = "stream")]
38 Stream(Pin<Box<dyn futures_util::Stream<Item = Result<Bytes, StreamError>> + Send + Sync>>),
39 #[cfg(feature = "stream")]
40 IoStream(
41 Pin<Box<dyn futures_util::Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync>>,
42 ),
43 #[cfg(feature = "stream")]
44 BytesIterator(Vec<Bytes>),
45 #[cfg(feature = "stream")]
46 CodecReader {
47 reader: Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>,
48 codec_type: CodecType,
49 },
50}
51
52impl RequestBody {
53 pub const fn from_static(s: &'static str) -> Self {
54 Self::Static(s)
55 }
56
57 pub fn from_string(s: impl Into<String>) -> Self {
58 Self::String(s.into())
59 }
60
61 pub fn from_bytes(bytes: Bytes) -> Self {
62 Self::Bytes(bytes)
63 }
64
65 pub fn from_vec(vec: Vec<u8>) -> Self {
66 Self::Vec(vec)
67 }
68
69 pub fn from_json(value: serde_json::Value) -> Self {
70 Self::Json(value)
71 }
72
73 pub fn from_json_serializable<T: serde::Serialize>(
74 value: &T,
75 ) -> Result<Self, serde_json::Error> {
76 let json_value = serde_json::to_value(value)?;
77 Ok(Self::Json(json_value))
78 }
79
80 pub fn from_form(form: Vec<(String, String)>) -> Self {
81 Self::Form(form)
82 }
83
84 pub fn from_form_pairs<I, K, V>(pairs: I) -> Self
85 where
86 I: IntoIterator<Item = (K, V)>,
87 K: Into<String>,
88 V: Into<String>,
89 {
90 let form: Vec<(String, String)> = pairs
91 .into_iter()
92 .map(|(k, v)| (k.into(), v.into()))
93 .collect();
94 Self::Form(form)
95 }
96
97 pub fn from_multipart(form: multipart::Form) -> Self {
98 Self::Multipart(form)
99 }
100
101 pub const fn empty() -> Self {
102 Self::Empty
103 }
104
105 pub fn from_reqwest_body(body: Body) -> Self {
106 Self::Custom(body)
107 }
108
109 #[cfg(feature = "stream")]
110 pub fn from_file(file: tokio::fs::File) -> Self {
111 Self::File(file)
112 }
113
114 #[cfg(feature = "stream")]
115 pub fn from_file_buffered(file: tokio::fs::File, buffer_size: usize) -> Self {
116 Self::BufferedFile { file, buffer_size }
117 }
118
119 #[cfg(feature = "stream")]
120 pub fn from_async_read<R>(reader: R) -> Self
121 where
122 R: tokio::io::AsyncRead + Send + Sync + 'static,
123 {
124 Self::AsyncRead(Box::pin(reader))
125 }
126
127 #[cfg(feature = "stream")]
128 pub async fn from_file_path<P: AsRef<std::path::Path>>(path: P) -> Result<Self, StreamError> {
129 use tokio::fs::File;
130
131 use super::error::FileOperation;
132
133 let path_ref = path.as_ref();
134 let file = File::open(path_ref)
135 .await
136 .map_err(|e| StreamError::file_error(path_ref, FileOperation::Open, e))?;
137 Ok(Self::from_file(file))
138 }
139
140 #[cfg(feature = "stream")]
141 pub async fn from_file_path_buffered<P: AsRef<std::path::Path>>(
142 path: P,
143 buffer_size: usize,
144 ) -> Result<Self, StreamError> {
145 use tokio::fs::File;
146
147 use super::error::FileOperation;
148
149 let path_ref = path.as_ref();
150 let file = File::open(path_ref)
151 .await
152 .map_err(|e| StreamError::file_error(path_ref, FileOperation::Open, e))?;
153 Ok(Self::from_file_buffered(file, buffer_size))
154 }
155
156 #[cfg(feature = "stream")]
157 pub fn from_tcp_stream(tcp: tokio::net::TcpStream) -> Self {
158 Self::TcpStream(tcp)
159 }
160
161 #[cfg(feature = "stream")]
162 pub fn from_command_output(mut command: tokio::process::Command) -> Result<Self, StreamError> {
163 use std::{io, process::Stdio};
164
165 use super::error::ProcessOperation;
166
167 let command_str = format!("{command:?}");
168 let mut child =
169 command
170 .stdout(Stdio::piped())
171 .spawn()
172 .map_err(|e| StreamError::Process {
173 command: command_str.clone(),
174 operation: ProcessOperation::Spawn,
175 source: e,
176 })?;
177
178 let stdout = child.stdout.take().ok_or_else(|| StreamError::Process {
179 command: command_str.clone(),
180 operation: ProcessOperation::ReadStdout,
181 source: io::Error::other("Failed to capture stdout"),
182 })?;
183
184 Ok(Self::ProcessOutput {
185 command: command_str,
186 child,
187 stdout: Box::pin(stdout),
188 })
189 }
190
191 #[cfg(feature = "stream")]
192 pub fn from_stream<S>(stream: S) -> Self
193 where
194 S: futures_util::Stream<Item = Result<Bytes, StreamError>> + Send + Sync + 'static,
195 {
196 Self::Stream(Box::pin(stream))
197 }
198
199 #[cfg(feature = "stream")]
200 pub fn from_io_stream<S>(stream: S) -> Self
201 where
202 S: futures_util::Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
203 {
204 Self::IoStream(Box::pin(stream))
205 }
206
207 #[cfg(feature = "stream")]
208 pub fn from_bytes_iter<I>(iter: I) -> Self
209 where
210 I: IntoIterator<Item = Bytes>,
211 {
212 Self::BytesIterator(iter.into_iter().collect())
213 }
214
215 #[cfg(feature = "stream")]
216 pub fn from_framed_read<R>(reader: R, codec_type: CodecType) -> Self
217 where
218 R: tokio::io::AsyncRead + Send + Sync + 'static,
219 {
220 Self::CodecReader {
221 reader: Box::pin(reader),
222 codec_type,
223 }
224 }
225
226 #[cfg(feature = "stream")]
227 pub fn from_bytes_framed<R>(reader: R) -> Self
228 where
229 R: tokio::io::AsyncRead + Send + Sync + 'static,
230 {
231 Self::from_framed_read(reader, CodecType::Bytes)
232 }
233
234 pub fn is_empty(&self) -> bool {
235 match self {
236 Self::Empty => true,
237 Self::Static(s) => s.is_empty(),
238 Self::String(s) => s.is_empty(),
239 Self::Bytes(b) => b.is_empty(),
240 Self::Vec(v) => v.is_empty(),
241 Self::Form(f) => f.is_empty(),
242 Self::Json(j) => j.is_null(),
243 #[cfg(feature = "stream")]
244 Self::BytesIterator(bytes) => bytes.is_empty(),
245 _ => false,
246 }
247 }
248
249 pub fn content_length(&self) -> Option<u64> {
250 match self {
251 Self::Empty => Some(0),
252 Self::Static(s) => Some(s.len() as u64),
253 Self::String(s) => Some(s.len() as u64),
254 Self::Bytes(b) => Some(b.len() as u64),
255 Self::Vec(v) => Some(v.len() as u64),
256 Self::Json(j) => serde_json::to_string(j).ok().map(|s| s.len() as u64),
257 Self::Form(f) => {
258 let encoded = serde_urlencoded::to_string(f).ok()?;
259 Some(encoded.len() as u64)
260 }
261 #[cfg(feature = "stream")]
262 Self::BytesIterator(bytes) => Some(bytes.iter().map(|b| b.len() as u64).sum()),
263 _ => None,
264 }
265 }
266
267 pub fn into_reqwest_body(self, client: RequestBuilder) -> RequestBuilder {
268 match self {
269 Self::Static(s) => client.body(s),
270 Self::String(s) => client.body(s),
271 Self::Bytes(b) => client.body(b),
272 Self::Vec(v) => client.body(v),
273 Self::Json(j) => {
274 let json_string = serde_json::to_string(&j).expect("Failed to serialize JSON");
275 client.body(json_string)
276 }
277 Self::Form(f) => {
278 let form_string =
279 serde_urlencoded::to_string(&f).expect("Failed to serialize form data");
280 client.body(form_string)
281 }
282 Self::Multipart(m) => client.multipart(m),
283 Self::Custom(b) => client.body(b),
284 Self::Empty => client.body(""),
285
286 #[cfg(feature = "stream")]
287 Self::File(file) => {
288 use futures_util::TryStreamExt;
289 use tokio_util::io::ReaderStream;
290
291 let stream = ReaderStream::new(file);
292 let stream = stream.map_err(|e| StreamError::io_error("file_read", e));
293 client.body(Body::wrap_stream(stream))
294 }
295
296 #[cfg(feature = "stream")]
297 Self::BufferedFile { file, buffer_size } => {
298 use futures_util::TryStreamExt;
299 use tokio::io::BufReader;
300 use tokio_util::io::ReaderStream;
301
302 let buffered_reader = BufReader::with_capacity(buffer_size, file);
303 let stream = ReaderStream::new(buffered_reader);
304 let stream = stream.map_err(|e| StreamError::io_error("buffered_file_read", e));
305 client.body(Body::wrap_stream(stream))
306 }
307
308 #[cfg(feature = "stream")]
309 Self::AsyncRead(reader) => {
310 use futures_util::TryStreamExt;
311 use tokio_util::io::ReaderStream;
312
313 let stream = ReaderStream::new(reader);
314 let stream = stream.map_err(|e| StreamError::io_error("async_read", e));
315 client.body(Body::wrap_stream(stream))
316 }
317
318 #[cfg(feature = "stream")]
319 Self::TcpStream(tcp) => {
320 use crate::api::request::error::NetworkOperation;
321 use futures_util::TryStreamExt;
322 use tokio_util::io::ReaderStream;
323
324 let stream = ReaderStream::new(tcp);
325 let stream = stream.map_err(|e| {
326 StreamError::network_error("tcp_stream", NetworkOperation::Read, e)
327 });
328 client.body(Body::wrap_stream(stream))
329 }
330
331 #[cfg(feature = "stream")]
332 Self::ProcessOutput { stdout, .. } => {
333 use futures_util::TryStreamExt;
334 use tokio_util::io::ReaderStream;
335
336 let stream = ReaderStream::new(stdout);
337 let stream = stream.map_err(|e| StreamError::io_error("command_output", e));
338 client.body(Body::wrap_stream(stream))
339 }
340
341 #[cfg(feature = "stream")]
342 Self::Stream(s) => client.body(Body::wrap_stream(s)),
343 #[cfg(feature = "stream")]
344 Self::IoStream(stream) => {
345 use futures_util::TryStreamExt;
346
347 let stream = stream.map_err(StreamError::from);
348 client.body(Body::wrap_stream(stream))
349 }
350
351 #[cfg(feature = "stream")]
352 Self::BytesIterator(bytes) => {
353 use futures_util::stream;
354
355 let iter_stream = stream::iter(bytes.into_iter().map(Ok::<Bytes, StreamError>));
356 client.body(Body::wrap_stream(iter_stream))
357 }
358
359 #[cfg(feature = "stream")]
360 Self::CodecReader { reader, codec_type } => {
361 use futures_util::{StreamExt, TryStreamExt};
362 use std::io;
363 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec, LinesCodecError};
364
365 match codec_type {
366 CodecType::Bytes => {
367 let framed = FramedRead::new(reader, BytesCodec::new());
368 let stream = framed.map_err(|e| StreamError::io_error("bytes_codec", e));
369 client.body(Body::wrap_stream(stream))
370 }
371 CodecType::Lines => {
372 let framed = FramedRead::new(reader, LinesCodec::new());
373 let stream = framed.map(|result| match result {
374 Ok(line) => Ok(Bytes::from(line + "\n")),
375 Err(lines_error) => {
376 let io_error = match lines_error {
377 LinesCodecError::MaxLineLengthExceeded => {
378 io::Error::new(io::ErrorKind::InvalidData, "Line too long")
379 }
380 LinesCodecError::Io(e) => e,
381 };
382 Err(StreamError::io_error("lines_codec", io_error))
383 }
384 });
385 client.body(Body::wrap_stream(stream))
386 }
387 CodecType::Json => {
388 let framed = FramedRead::new(reader, LinesCodec::new());
389 let stream = framed.map(|result| {
390 result.map(|line| Bytes::from(line + "\n")).map_err(|e| {
391 StreamError::io_error("json_codec", io::Error::other(e.to_string()))
392 })
393 });
394 client.body(Body::wrap_stream(stream))
395 }
396 CodecType::Custom(name) => {
397 let framed = FramedRead::new(reader, BytesCodec::new());
398 let stream = framed.map_err(move |e| {
399 StreamError::io_error(format!("custom_codec_{name}"), e)
400 });
401 client.body(Body::wrap_stream(stream))
402 }
403 }
404 }
405 }
406 }
407}
408
409impl fmt::Display for RequestBody {
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 match self {
412 Self::Empty { .. } => write!(f, "empty body"),
413
414 Self::Static(s) => write!(f, "static ({} bytes)", s.len()),
415 Self::String(s) => write!(f, "string ({} bytes)", s.len()),
416 Self::Bytes(b) => write!(f, "bytes ({} bytes)", b.len()),
417 Self::Vec(v) => write!(f, "binary ({} bytes)", v.len()),
418 Self::Json(_) => write!(f, "JSON"),
419 Self::Form(data) => write!(f, "form ({} fields)", data.len()),
420 Self::Multipart(_) => write!(f, "multipart"),
421 Self::Custom(_) => write!(f, "custom"),
422
423 #[cfg(feature = "stream")]
424 Self::File { .. } => write!(f, "file stream"),
425 #[cfg(feature = "stream")]
426 Self::BufferedFile { buffer_size, .. } => write!(f, "buffered file ({buffer_size}B)"),
427 #[cfg(feature = "stream")]
428 Self::AsyncRead(..) => write!(f, "async reader"),
429 #[cfg(feature = "stream")]
430 Self::TcpStream(_) => write!(f, "TCP stream"),
431 #[cfg(feature = "stream")]
432 Self::ProcessOutput { command, .. } => {
433 write!(f, "process output: {}", truncate_command(command))
434 }
435 #[cfg(feature = "stream")]
436 Self::Stream { .. } => write!(f, "stream"),
437 #[cfg(feature = "stream")]
438 Self::IoStream { .. } => write!(f, "I/O stream"),
439 #[cfg(feature = "stream")]
440 Self::BytesIterator(chunks) => {
441 let total: usize = chunks.iter().map(|b| b.len()).sum();
442 write!(
443 f,
444 "bytes iterator ({} chunks, {} bytes)",
445 chunks.len(),
446 total
447 )
448 }
449 #[cfg(feature = "stream")]
450 Self::CodecReader { codec_type, .. } => write!(f, "{codec_type} codec"),
451 }
452 }
453}
454
455impl fmt::Debug for RequestBody {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 match self {
458 Self::Static(s) => f
459 .debug_tuple("Static")
460 .field(&format!("{}B: {}", s.len(), safe_debug_preview(s)))
461 .finish(),
462
463 Self::String(s) => f
464 .debug_tuple("String")
465 .field(&format!("{}B/{} cap", s.len(), s.capacity()))
466 .finish(),
467
468 Self::Bytes(b) => f
469 .debug_tuple("Bytes")
470 .field(&format!("{}B", b.len()))
471 .finish(),
472
473 Self::Vec(v) => f
474 .debug_tuple("Vec")
475 .field(&format!("{}B/{} cap", v.len(), v.capacity()))
476 .finish(),
477 Self::Json(j) => {
478 let preview = serde_json::to_string(j)
479 .map(|s| safe_debug_preview(&s))
480 .unwrap_or_else(|_| "[invalid]".to_string());
481 f.debug_tuple("Json").field(&preview).finish()
482 }
483
484 Self::Form(data) => {
485 let safe_form: Vec<_> = data
486 .iter()
487 .take(3)
488 .map(|(k, v)| {
489 (if is_sensitive_key(k) {
490 (k.as_str(), "[REDACTED]")
491 } else {
492 (k.as_str(), truncate_str(v, 20))
493 },)
494 })
495 .collect();
496
497 if data.len() > 3 {
498 f.debug_tuple("Form")
499 .field(&format!("{:?}... ({} total)", safe_form, data.len()))
500 .finish()
501 } else {
502 f.debug_tuple("Form").field(&safe_form).finish()
503 }
504 }
505 Self::Multipart(_) => f.debug_tuple("Multipart").field(&"[multipart]").finish(),
506 Self::Custom(_) => f.debug_tuple("Custom").field(&"[reqwest::Body]").finish(),
507 Self::Empty => f.debug_tuple("Empty").finish(),
508
509 #[cfg(feature = "stream")]
510 Self::File(_) => f.debug_tuple("File").field(&"[file]").finish(),
511 #[cfg(feature = "stream")]
512 Self::BufferedFile {
513 file: _,
514 buffer_size,
515 } => f
516 .debug_tuple("BufferedFile")
517 .field(&format!("{buffer_size}B buffer"))
518 .finish(),
519 #[cfg(feature = "stream")]
520 Self::AsyncRead(_) => f.debug_tuple("AsyncRead").field(&"[reader]").finish(),
521 #[cfg(feature = "stream")]
522 Self::TcpStream(_) => f.debug_tuple("TcpStream").field(&"[tcp]").finish(),
523 #[cfg(feature = "stream")]
524 Self::ProcessOutput { command, .. } => f
525 .debug_tuple("ProcessOutput")
526 .field(&truncate_str(command, 40))
527 .finish(),
528 #[cfg(feature = "stream")]
529 Self::Stream(_) => f.debug_tuple("Stream").field(&"[stream]").finish(),
530 #[cfg(feature = "stream")]
531 Self::IoStream(_) => f.debug_tuple("IoStream").field(&"[io_stream]").finish(),
532 #[cfg(feature = "stream")]
533 Self::BytesIterator(chunks) => {
534 let total: usize = chunks.iter().map(|b| b.len()).sum();
535 f.debug_tuple("BytesIterator")
536 .field(&format!("{total} chunks, {total}B"))
537 .finish()
538 }
539 #[cfg(feature = "stream")]
540 Self::CodecReader { codec_type, .. } => {
541 f.debug_tuple("CodecReader").field(codec_type).finish()
542 }
543 }
544 }
545}
546
547#[cfg(feature = "stream")]
548#[derive(Debug, Clone, PartialEq, Eq)]
549pub enum CodecType {
550 Bytes,
551 Lines,
552 Json,
553 Custom(String),
554}
555
556#[cfg(feature = "stream")]
557impl fmt::Display for CodecType {
558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559 match self {
560 CodecType::Bytes => write!(f, "bytes"),
561 CodecType::Lines => write!(f, "lines"),
562 CodecType::Json => write!(f, "json"),
563 CodecType::Custom(name) => write!(f, "{name}"),
564 }
565 }
566}
567
568fn safe_debug_preview(s: &str) -> String {
569 if s.len() <= 100 {
570 format!("{s:?}")
571 } else {
572 format!("{:?}... (+{} more chars)", &s[..100], s.len() - 100)
573 }
574}
575
576fn truncate_string(s: &str, max_len: usize) -> String {
577 if s.len() <= max_len {
578 s.to_string()
579 } else {
580 format!("{}...", &s[..max_len])
581 }
582}
583
584fn truncate_str(s: &str, max_len: usize) -> &str {
585 if s.len() <= max_len {
586 s
587 } else {
588 &s[..max_len]
589 }
590}
591
592fn truncate_command(cmd: &str) -> String {
593 truncate_string(cmd, 80)
594}
595
596fn is_sensitive_key(key: &str) -> bool {
597 let key_lower = key.to_lowercase();
598 key_lower.contains("password")
599 || key_lower.contains("token")
600 || key_lower.contains("secret")
601 || key_lower.contains("auth")
602}