1use crate::error::CamelError;
2use bytes::{Bytes, BytesMut};
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use std::io;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context as TaskContext, Poll};
9use tokio::io::{AsyncRead, ReadBuf};
10use tokio::sync::Mutex;
11use tokio_util::io::StreamReader;
12
13const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
14
15pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
19
20#[derive(Debug, Clone, Default)]
22pub struct StreamMetadata {
23 pub size_hint: Option<u64>,
25 pub content_type: Option<String>,
27 pub origin: Option<String>,
29}
30
31pub struct StreamBody {
70 #[allow(clippy::type_complexity)]
72 pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
73 pub metadata: StreamMetadata,
75}
76
77impl std::fmt::Debug for StreamBody {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("StreamBody")
80 .field("metadata", &self.metadata)
81 .field("stream", &"<BoxStream>")
82 .finish()
83 }
84}
85
86impl Clone for StreamBody {
87 fn clone(&self) -> Self {
88 Self {
89 stream: Arc::clone(&self.stream),
90 metadata: self.metadata.clone(),
91 }
92 }
93}
94
95#[allow(clippy::type_complexity)]
109struct StreamAsyncRead {
110 arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
111 reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
113 consumed: bool,
115}
116
117impl AsyncRead for StreamAsyncRead {
118 fn poll_read(
119 mut self: Pin<&mut Self>,
120 cx: &mut TaskContext<'_>,
121 buf: &mut ReadBuf<'_>,
122 ) -> Poll<io::Result<()>> {
123 if self.consumed {
124 return Poll::Ready(Err(io::Error::other("stream already consumed")));
125 }
126 if self.reader.is_none() {
128 let extracted = {
130 match self.arc.try_lock() {
131 Ok(mut guard) => guard.take(),
132 Err(_) => {
133 cx.waker().wake_by_ref();
135 return Poll::Pending;
136 }
137 }
138 };
139 match extracted {
141 Some(stream) => {
142 let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
143 self.reader = Some(Box::new(StreamReader::new(mapped)));
144 }
145 None => {
146 self.consumed = true;
147 return Poll::Ready(Err(io::Error::other("stream already consumed")));
148 }
149 }
150 }
151 Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf)
153 }
154}
155
156#[derive(Debug, Default)]
158pub enum Body {
159 #[default]
161 Empty,
162 Bytes(Bytes),
164 Text(String),
166 Json(serde_json::Value),
168 Xml(String),
170 Stream(StreamBody),
172}
173
174impl Clone for Body {
175 fn clone(&self) -> Self {
176 match self {
177 Body::Empty => Body::Empty,
178 Body::Bytes(b) => Body::Bytes(b.clone()),
179 Body::Text(s) => Body::Text(s.clone()),
180 Body::Json(v) => Body::Json(v.clone()),
181 Body::Xml(s) => Body::Xml(s.clone()),
182 Body::Stream(s) => Body::Stream(s.clone()),
183 }
184 }
185}
186
187impl PartialEq for Body {
188 fn eq(&self, other: &Self) -> bool {
189 match (self, other) {
190 (Body::Empty, Body::Empty) => true,
191 (Body::Text(a), Body::Text(b)) => a == b,
192 (Body::Json(a), Body::Json(b)) => a == b,
193 (Body::Bytes(a), Body::Bytes(b)) => a == b,
194 (Body::Xml(a), Body::Xml(b)) => a == b,
195 _ => false,
197 }
198 }
199}
200
201impl Body {
202 pub fn is_empty(&self) -> bool {
204 matches!(self, Body::Empty)
205 }
206
207 pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
211 match self {
212 Body::Empty => Ok(Bytes::new()),
213 Body::Bytes(b) => {
214 if b.len() > max_size {
215 return Err(CamelError::StreamLimitExceeded(max_size));
216 }
217 Ok(b)
218 }
219 Body::Text(s) => {
220 if s.len() > max_size {
221 return Err(CamelError::StreamLimitExceeded(max_size));
222 }
223 Ok(Bytes::from(s))
224 }
225 Body::Json(v) => {
226 let b = serde_json::to_vec(&v)
227 .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
228 if b.len() > max_size {
229 return Err(CamelError::StreamLimitExceeded(max_size));
230 }
231 Ok(Bytes::from(b))
232 }
233 Body::Xml(s) => {
234 if s.len() > max_size {
235 return Err(CamelError::StreamLimitExceeded(max_size));
236 }
237 Ok(Bytes::from(s))
238 }
239 Body::Stream(s) => {
240 let mut stream_lock = s.stream.lock().await;
241 let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
242
243 let mut buffer = BytesMut::new();
244 while let Some(chunk_res) = stream.next().await {
245 let chunk = chunk_res?;
246 if buffer.len() + chunk.len() > max_size {
247 return Err(CamelError::StreamLimitExceeded(max_size));
248 }
249 buffer.extend_from_slice(&chunk);
250 }
251 Ok(buffer.freeze())
252 }
253 }
254 }
255
256 pub async fn materialize(self) -> Result<Bytes, CamelError> {
267 self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
268 }
269
270 pub fn into_async_read(self) -> BoxAsyncRead {
281 match self {
282 Body::Empty => Box::pin(tokio::io::empty()),
283 Body::Bytes(b) => Box::pin(std::io::Cursor::new(b)),
284 Body::Text(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
285 Body::Json(v) => match serde_json::to_vec(&v) {
286 Ok(bytes) => Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead,
287 Err(e) => {
288 let err = io::Error::new(io::ErrorKind::InvalidData, e.to_string());
289 let stream = futures::stream::iter(vec![Err::<Bytes, io::Error>(err)]);
290 Box::pin(StreamReader::new(stream)) as BoxAsyncRead
291 }
292 },
293 Body::Xml(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
294 Body::Stream(s) => Box::pin(StreamAsyncRead {
295 arc: s.stream,
296 reader: None,
297 consumed: false,
298 }),
299 }
300 }
301
302 pub fn as_text(&self) -> Option<&str> {
304 match self {
305 Body::Text(s) => Some(s.as_str()),
306 _ => None,
307 }
308 }
309
310 pub fn as_xml(&self) -> Option<&str> {
312 match self {
313 Body::Xml(s) => Some(s.as_str()),
314 _ => None,
315 }
316 }
317
318 pub fn try_into_text(self) -> Result<Body, CamelError> {
322 crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
323 }
324
325 pub fn try_into_json(self) -> Result<Body, CamelError> {
329 crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
330 }
331
332 pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
336 crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
337 }
338
339 pub fn try_into_xml(self) -> Result<Body, CamelError> {
343 crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
344 }
345}
346
347impl From<String> for Body {
349 fn from(s: String) -> Self {
350 Body::Text(s)
351 }
352}
353
354impl From<&str> for Body {
355 fn from(s: &str) -> Self {
356 Body::Text(s.to_string())
357 }
358}
359
360impl From<Bytes> for Body {
361 fn from(b: Bytes) -> Self {
362 Body::Bytes(b)
363 }
364}
365
366impl From<Vec<u8>> for Body {
367 fn from(v: Vec<u8>) -> Self {
368 Body::Bytes(Bytes::from(v))
369 }
370}
371
372impl From<serde_json::Value> for Body {
373 fn from(v: serde_json::Value) -> Self {
374 Body::Json(v)
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[test]
383 fn test_body_default_is_empty() {
384 let body = Body::default();
385 assert!(body.is_empty());
386 }
387
388 #[test]
389 fn test_body_from_string() {
390 let body = Body::from("hello".to_string());
391 assert_eq!(body.as_text(), Some("hello"));
392 }
393
394 #[test]
395 fn test_body_from_str() {
396 let body = Body::from("world");
397 assert_eq!(body.as_text(), Some("world"));
398 }
399
400 #[test]
401 fn test_body_from_bytes() {
402 let body = Body::from(Bytes::from_static(b"data"));
403 assert!(!body.is_empty());
404 assert!(matches!(body, Body::Bytes(_)));
405 }
406
407 #[test]
408 fn test_body_from_json() {
409 let val = serde_json::json!({"key": "value"});
410 let body = Body::from(val.clone());
411 assert!(matches!(body, Body::Json(_)));
412 }
413
414 #[tokio::test]
415 async fn test_into_bytes_from_stream() {
416 use futures::stream;
417 let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
418 let stream = stream::iter(chunks);
419 let body = Body::Stream(StreamBody {
420 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
421 metadata: StreamMetadata::default(),
422 });
423
424 let result = body.into_bytes(100).await.unwrap();
425 assert_eq!(result, Bytes::from("hello world"));
426 }
427
428 #[tokio::test]
429 async fn test_into_bytes_limit_exceeded() {
430 use futures::stream;
431 let chunks = vec![Ok(Bytes::from("this is too long"))];
432 let stream = stream::iter(chunks);
433 let body = Body::Stream(StreamBody {
434 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
435 metadata: StreamMetadata::default(),
436 });
437
438 let result = body.into_bytes(5).await;
439 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
440 }
441
442 #[tokio::test]
443 async fn test_into_bytes_already_consumed() {
444 use futures::stream;
445 let chunks = vec![Ok(Bytes::from("data"))];
446 let stream = stream::iter(chunks);
447 let body = Body::Stream(StreamBody {
448 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
449 metadata: StreamMetadata::default(),
450 });
451
452 let cloned = body.clone();
453 let _ = body.into_bytes(100).await.unwrap();
454
455 let result = cloned.into_bytes(100).await;
456 assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
457 }
458
459 #[tokio::test]
460 async fn test_materialize_with_default_limit() {
461 use futures::stream;
462
463 let chunks = vec![Ok(Bytes::from("test data"))];
465 let stream = stream::iter(chunks);
466 let body = Body::Stream(StreamBody {
467 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
468 metadata: StreamMetadata::default(),
469 });
470
471 let result = body.materialize().await;
472 assert!(result.is_ok());
473 assert_eq!(result.unwrap(), Bytes::from("test data"));
474 }
475
476 #[tokio::test]
477 async fn test_materialize_non_stream_body_types() {
478 let body = Body::Empty;
482 let result = body.materialize().await.unwrap();
483 assert!(result.is_empty());
484
485 let body = Body::Bytes(Bytes::from("bytes data"));
487 let result = body.materialize().await.unwrap();
488 assert_eq!(result, Bytes::from("bytes data"));
489
490 let body = Body::Text("text data".to_string());
492 let result = body.materialize().await.unwrap();
493 assert_eq!(result, Bytes::from("text data"));
494
495 let body = Body::Json(serde_json::json!({"key": "value"}));
497 let result = body.materialize().await.unwrap();
498 assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
499
500 let xml = "<root><child>value</child></root>";
502 let body = Body::Xml(xml.to_string());
503 let result = body.materialize().await.unwrap();
504 assert_eq!(result, Bytes::from(xml));
505 }
506
507 #[tokio::test]
508 async fn test_materialize_exceeds_default_limit() {
509 use futures::stream;
510
511 let large_data = vec![0u8; 15 * 1024 * 1024];
513 let chunks = vec![Ok(Bytes::from(large_data))];
514 let stream = stream::iter(chunks);
515 let body = Body::Stream(StreamBody {
516 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
517 metadata: StreamMetadata::default(),
518 });
519
520 let result = body.materialize().await;
521 assert!(matches!(
522 result,
523 Err(CamelError::StreamLimitExceeded(10_485_760))
524 ));
525 }
526
527 #[test]
528 fn stream_variants_are_never_equal() {
529 use futures::stream;
530
531 let make_stream = || {
532 let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
533 Body::Stream(StreamBody {
534 stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
535 metadata: StreamMetadata::default(),
536 })
537 };
538 assert_ne!(make_stream(), make_stream());
539 }
540
541 #[test]
544 fn test_body_xml_as_xml() {
545 let xml = "<root><child>value</child></root>";
546 let body = Body::Xml(xml.to_string());
547 assert_eq!(body.as_xml(), Some(xml));
548 }
549
550 #[test]
551 fn test_body_non_xml_as_xml_returns_none() {
552 let body = Body::Text("<root/>".to_string());
554 assert_eq!(body.as_xml(), None);
555
556 let body = Body::Empty;
558 assert_eq!(body.as_xml(), None);
559
560 let body = Body::Bytes(Bytes::from("<root/>"));
562 assert_eq!(body.as_xml(), None);
563
564 let body = Body::Json(serde_json::json!({"key": "value"}));
566 assert_eq!(body.as_xml(), None);
567 }
568
569 #[test]
570 fn test_body_xml_partial_eq() {
571 let body1 = Body::Xml("a".to_string());
573 let body2 = Body::Xml("a".to_string());
574 assert_eq!(body1, body2);
575
576 let body1 = Body::Xml("a".to_string());
578 let body2 = Body::Xml("b".to_string());
579 assert_ne!(body1, body2);
580 }
581
582 #[test]
583 fn test_body_xml_not_equal_to_other_variants() {
584 let xml_body = Body::Xml("x".to_string());
586 let text_body = Body::Text("x".to_string());
587 assert_ne!(xml_body, text_body);
588 }
589
590 #[test]
591 fn test_try_into_xml_from_text() {
592 let body = Body::Text("<root/>".to_string());
593 let result = body.try_into_xml();
594 assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
595 }
596
597 #[test]
598 fn test_try_into_xml_invalid_text() {
599 let body = Body::Text("not xml".to_string());
600 let result = body.try_into_xml();
601 assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
602 }
603
604 #[test]
605 fn test_body_xml_clone() {
606 let original = Body::Xml("hello".to_string());
607 let cloned = original.clone();
608 assert_eq!(original, cloned);
609 }
610
611 #[tokio::test]
614 async fn test_into_async_read_empty() {
615 use tokio::io::AsyncReadExt;
616 let body = Body::Empty;
617 let mut reader = body.into_async_read();
618 let mut buf = Vec::new();
619 reader.read_to_end(&mut buf).await.unwrap();
620 assert!(buf.is_empty());
621 }
622
623 #[tokio::test]
624 async fn test_into_async_read_bytes() {
625 use tokio::io::AsyncReadExt;
626 let body = Body::Bytes(Bytes::from("hello"));
627 let mut reader = body.into_async_read();
628 let mut buf = Vec::new();
629 reader.read_to_end(&mut buf).await.unwrap();
630 assert_eq!(buf, b"hello");
631 }
632
633 #[tokio::test]
634 async fn test_into_async_read_text() {
635 use tokio::io::AsyncReadExt;
636 let body = Body::Text("world".to_string());
637 let mut reader = body.into_async_read();
638 let mut buf = Vec::new();
639 reader.read_to_end(&mut buf).await.unwrap();
640 assert_eq!(buf, b"world");
641 }
642
643 #[tokio::test]
644 async fn test_into_async_read_json() {
645 use tokio::io::AsyncReadExt;
646 let body = Body::Json(serde_json::json!({"key": "val"}));
647 let mut reader = body.into_async_read();
648 let mut buf = Vec::new();
649 reader.read_to_end(&mut buf).await.unwrap();
650 let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
651 assert_eq!(parsed["key"], "val");
652 }
653
654 #[tokio::test]
655 async fn test_into_async_read_xml() {
656 use tokio::io::AsyncReadExt;
657 let body = Body::Xml("<root/>".to_string());
658 let mut reader = body.into_async_read();
659 let mut buf = Vec::new();
660 reader.read_to_end(&mut buf).await.unwrap();
661 assert_eq!(buf, b"<root/>");
662 }
663
664 #[tokio::test]
665 async fn test_into_async_read_stream_multichunk() {
666 use tokio::io::AsyncReadExt;
667 let chunks: Vec<Result<Bytes, CamelError>> = vec![
668 Ok(Bytes::from("foo")),
669 Ok(Bytes::from("bar")),
670 Ok(Bytes::from("baz")),
671 ];
672 let stream = futures::stream::iter(chunks);
673 let body = Body::Stream(StreamBody {
674 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
675 metadata: StreamMetadata {
676 size_hint: None,
677 content_type: None,
678 origin: None,
679 },
680 });
681 let mut reader = body.into_async_read();
682 let mut buf = Vec::new();
683 reader.read_to_end(&mut buf).await.unwrap();
684 assert_eq!(buf, b"foobarbaz");
685 }
686
687 #[tokio::test]
688 async fn test_into_async_read_already_consumed() {
689 use tokio::io::AsyncReadExt;
690 let arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>> =
692 Arc::new(Mutex::new(None));
693 let body = Body::Stream(StreamBody {
694 stream: arc,
695 metadata: StreamMetadata {
696 size_hint: None,
697 content_type: None,
698 origin: None,
699 },
700 });
701 let mut reader = body.into_async_read();
702 let mut buf = Vec::new();
703 let result = reader.read_to_end(&mut buf).await;
704 assert!(result.is_err());
705 }
706}