1use crate::error::CamelError;
2pub const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
7
8use bytes::{Bytes, BytesMut};
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use std::io;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context as TaskContext, Poll};
15use tokio::io::{AsyncRead, ReadBuf};
16use tokio::sync::Mutex;
17use tokio_util::io::StreamReader;
18
19pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
23
24#[derive(Debug, Clone, Default)]
26pub struct StreamMetadata {
27 pub size_hint: Option<u64>,
29 pub content_type: Option<String>,
31 pub origin: Option<String>,
33}
34
35pub struct StreamBody {
74 #[allow(clippy::type_complexity)]
76 pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
77 pub metadata: StreamMetadata,
79}
80
81impl std::fmt::Debug for StreamBody {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("StreamBody")
84 .field("metadata", &self.metadata)
85 .field("stream", &"<BoxStream>")
86 .finish()
87 }
88}
89
90impl Clone for StreamBody {
91 fn clone(&self) -> Self {
92 Self {
93 stream: Arc::clone(&self.stream),
94 metadata: self.metadata.clone(),
95 }
96 }
97}
98
99#[allow(clippy::type_complexity)]
113struct StreamAsyncRead {
114 arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
115 reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
117 consumed: bool,
119}
120
121impl AsyncRead for StreamAsyncRead {
122 fn poll_read(
123 mut self: Pin<&mut Self>,
124 cx: &mut TaskContext<'_>,
125 buf: &mut ReadBuf<'_>,
126 ) -> Poll<io::Result<()>> {
127 if self.consumed {
128 return Poll::Ready(Err(io::Error::other("stream already consumed")));
129 }
130 if self.reader.is_none() {
132 let extracted = {
134 match self.arc.try_lock() {
135 Ok(mut guard) => guard.take(),
136 Err(_) => {
137 cx.waker().wake_by_ref();
139 return Poll::Pending;
140 }
141 }
142 };
143 match extracted {
145 Some(stream) => {
146 let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
147 self.reader = Some(Box::new(StreamReader::new(mapped)));
148 }
149 None => {
150 self.consumed = true;
151 return Poll::Ready(Err(io::Error::other("stream already consumed")));
152 }
153 }
154 }
155 Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf)
157 }
158}
159
160#[derive(Debug, Default)]
162pub enum Body {
163 #[default]
165 Empty,
166 Bytes(Bytes),
168 Text(String),
170 Json(serde_json::Value),
172 Xml(String),
174 Stream(StreamBody),
176}
177
178impl Clone for Body {
179 fn clone(&self) -> Self {
180 match self {
181 Body::Empty => Body::Empty,
182 Body::Bytes(b) => Body::Bytes(b.clone()),
183 Body::Text(s) => Body::Text(s.clone()),
184 Body::Json(v) => Body::Json(v.clone()),
185 Body::Xml(s) => Body::Xml(s.clone()),
186 Body::Stream(s) => Body::Stream(s.clone()),
187 }
188 }
189}
190
191impl PartialEq for Body {
192 fn eq(&self, other: &Self) -> bool {
193 match (self, other) {
194 (Body::Empty, Body::Empty) => true,
195 (Body::Text(a), Body::Text(b)) => a == b,
196 (Body::Json(a), Body::Json(b)) => a == b,
197 (Body::Bytes(a), Body::Bytes(b)) => a == b,
198 (Body::Xml(a), Body::Xml(b)) => a == b,
199 _ => false,
201 }
202 }
203}
204
205impl Body {
206 pub fn is_empty(&self) -> bool {
208 matches!(self, Body::Empty)
209 }
210
211 pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
215 match self {
216 Body::Empty => Ok(Bytes::new()),
217 Body::Bytes(b) => {
218 if b.len() > max_size {
219 return Err(CamelError::StreamLimitExceeded(max_size));
220 }
221 Ok(b)
222 }
223 Body::Text(s) => {
224 if s.len() > max_size {
225 return Err(CamelError::StreamLimitExceeded(max_size));
226 }
227 Ok(Bytes::from(s))
228 }
229 Body::Json(v) => {
230 let b = serde_json::to_vec(&v)
231 .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
232 if b.len() > max_size {
233 return Err(CamelError::StreamLimitExceeded(max_size));
234 }
235 Ok(Bytes::from(b))
236 }
237 Body::Xml(s) => {
238 if s.len() > max_size {
239 return Err(CamelError::StreamLimitExceeded(max_size));
240 }
241 Ok(Bytes::from(s))
242 }
243 Body::Stream(s) => {
244 let mut stream_lock = s.stream.lock().await;
245 let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
246
247 let mut buffer = BytesMut::new();
248 while let Some(chunk_res) = stream.next().await {
249 let chunk = chunk_res?;
250 if buffer.len() + chunk.len() > max_size {
251 return Err(CamelError::StreamLimitExceeded(max_size));
252 }
253 buffer.extend_from_slice(&chunk);
254 }
255 Ok(buffer.freeze())
256 }
257 }
258 }
259
260 pub async fn materialize(self) -> Result<Bytes, CamelError> {
273 self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
274 }
275
276 pub fn into_async_read(self) -> BoxAsyncRead {
287 match self {
288 Body::Empty => Box::pin(tokio::io::empty()),
289 Body::Bytes(b) => Box::pin(std::io::Cursor::new(b)),
290 Body::Text(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
291 Body::Json(v) => match serde_json::to_vec(&v) {
292 Ok(bytes) => Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead,
293 Err(e) => {
294 let err = io::Error::new(io::ErrorKind::InvalidData, e.to_string());
295 let stream = futures::stream::iter(vec![Err::<Bytes, io::Error>(err)]);
296 Box::pin(StreamReader::new(stream)) as BoxAsyncRead
297 }
298 },
299 Body::Xml(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
300 Body::Stream(s) => Box::pin(StreamAsyncRead {
301 arc: s.stream,
302 reader: None,
303 consumed: false,
304 }),
305 }
306 }
307
308 pub fn as_text(&self) -> Option<&str> {
310 match self {
311 Body::Text(s) => Some(s.as_str()),
312 _ => None,
313 }
314 }
315
316 pub fn as_xml(&self) -> Option<&str> {
318 match self {
319 Body::Xml(s) => Some(s.as_str()),
320 _ => None,
321 }
322 }
323
324 pub fn try_into_text(self) -> Result<Body, CamelError> {
328 crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
329 }
330
331 pub fn try_into_json(self) -> Result<Body, CamelError> {
335 crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
336 }
337
338 pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
342 crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
343 }
344
345 pub fn try_into_xml(self) -> Result<Body, CamelError> {
349 crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
350 }
351}
352
353impl From<String> for Body {
355 fn from(s: String) -> Self {
356 Body::Text(s)
357 }
358}
359
360impl From<&str> for Body {
361 fn from(s: &str) -> Self {
362 Body::Text(s.to_string())
363 }
364}
365
366impl From<Bytes> for Body {
367 fn from(b: Bytes) -> Self {
368 Body::Bytes(b)
369 }
370}
371
372impl From<Vec<u8>> for Body {
373 fn from(v: Vec<u8>) -> Self {
374 Body::Bytes(Bytes::from(v))
375 }
376}
377
378impl From<serde_json::Value> for Body {
379 fn from(v: serde_json::Value) -> Self {
380 Body::Json(v)
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn test_body_default_is_empty() {
390 let body = Body::default();
391 assert!(body.is_empty());
392 }
393
394 #[test]
395 fn test_body_from_string() {
396 let body = Body::from("hello".to_string());
397 assert_eq!(body.as_text(), Some("hello"));
398 }
399
400 #[test]
401 fn test_body_from_str() {
402 let body = Body::from("world");
403 assert_eq!(body.as_text(), Some("world"));
404 }
405
406 #[test]
407 fn test_body_from_bytes() {
408 let body = Body::from(Bytes::from_static(b"data"));
409 assert!(!body.is_empty());
410 assert!(matches!(body, Body::Bytes(_)));
411 }
412
413 #[test]
414 fn test_body_from_json() {
415 let val = serde_json::json!({"key": "value"});
416 let body = Body::from(val.clone());
417 assert!(matches!(body, Body::Json(_)));
418 }
419
420 #[tokio::test]
421 async fn test_into_bytes_from_stream() {
422 use futures::stream;
423 let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
424 let stream = stream::iter(chunks);
425 let body = Body::Stream(StreamBody {
426 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
427 metadata: StreamMetadata::default(),
428 });
429
430 let result = body.into_bytes(100).await.unwrap();
431 assert_eq!(result, Bytes::from("hello world"));
432 }
433
434 #[tokio::test]
435 async fn test_into_bytes_limit_exceeded() {
436 use futures::stream;
437 let chunks = vec![Ok(Bytes::from("this is too long"))];
438 let stream = stream::iter(chunks);
439 let body = Body::Stream(StreamBody {
440 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
441 metadata: StreamMetadata::default(),
442 });
443
444 let result = body.into_bytes(5).await;
445 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
446 }
447
448 #[tokio::test]
449 async fn test_into_bytes_already_consumed() {
450 use futures::stream;
451 let chunks = vec![Ok(Bytes::from("data"))];
452 let stream = stream::iter(chunks);
453 let body = Body::Stream(StreamBody {
454 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
455 metadata: StreamMetadata::default(),
456 });
457
458 let cloned = body.clone();
459 let _ = body.into_bytes(100).await.unwrap();
460
461 let result = cloned.into_bytes(100).await;
462 assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
463 }
464
465 #[tokio::test]
466 async fn test_materialize_with_default_limit() {
467 use futures::stream;
468
469 let chunks = vec![Ok(Bytes::from("test data"))];
471 let stream = stream::iter(chunks);
472 let body = Body::Stream(StreamBody {
473 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
474 metadata: StreamMetadata::default(),
475 });
476
477 let result = body.materialize().await;
478 assert!(result.is_ok());
479 assert_eq!(result.unwrap(), Bytes::from("test data"));
480 }
481
482 #[tokio::test]
483 async fn test_materialize_non_stream_body_types() {
484 let body = Body::Empty;
488 let result = body.materialize().await.unwrap();
489 assert!(result.is_empty());
490
491 let body = Body::Bytes(Bytes::from("bytes data"));
493 let result = body.materialize().await.unwrap();
494 assert_eq!(result, Bytes::from("bytes data"));
495
496 let body = Body::Text("text data".to_string());
498 let result = body.materialize().await.unwrap();
499 assert_eq!(result, Bytes::from("text data"));
500
501 let body = Body::Json(serde_json::json!({"key": "value"}));
503 let result = body.materialize().await.unwrap();
504 assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
505
506 let xml = "<root><child>value</child></root>";
508 let body = Body::Xml(xml.to_string());
509 let result = body.materialize().await.unwrap();
510 assert_eq!(result, Bytes::from(xml));
511 }
512
513 #[tokio::test]
514 async fn test_materialize_exceeds_default_limit() {
515 use futures::stream;
516
517 let large_data = vec![0u8; 11 * 1024 * 1024];
519 let chunks = vec![Ok(Bytes::from(large_data))];
520 let stream = stream::iter(chunks);
521 let body = Body::Stream(StreamBody {
522 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
523 metadata: StreamMetadata::default(),
524 });
525
526 let result = body.materialize().await;
527 assert!(matches!(
528 result,
529 Err(CamelError::StreamLimitExceeded(10_485_760))
530 ));
531 }
532
533 #[test]
534 fn stream_variants_are_never_equal() {
535 use futures::stream;
536
537 let make_stream = || {
538 let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
539 Body::Stream(StreamBody {
540 stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
541 metadata: StreamMetadata::default(),
542 })
543 };
544 assert_ne!(make_stream(), make_stream());
545 }
546
547 #[test]
550 fn test_body_xml_as_xml() {
551 let xml = "<root><child>value</child></root>";
552 let body = Body::Xml(xml.to_string());
553 assert_eq!(body.as_xml(), Some(xml));
554 }
555
556 #[test]
557 fn test_body_non_xml_as_xml_returns_none() {
558 let body = Body::Text("<root/>".to_string());
560 assert_eq!(body.as_xml(), None);
561
562 let body = Body::Empty;
564 assert_eq!(body.as_xml(), None);
565
566 let body = Body::Bytes(Bytes::from("<root/>"));
568 assert_eq!(body.as_xml(), None);
569
570 let body = Body::Json(serde_json::json!({"key": "value"}));
572 assert_eq!(body.as_xml(), None);
573 }
574
575 #[test]
576 fn test_body_xml_partial_eq() {
577 let body1 = Body::Xml("a".to_string());
579 let body2 = Body::Xml("a".to_string());
580 assert_eq!(body1, body2);
581
582 let body1 = Body::Xml("a".to_string());
584 let body2 = Body::Xml("b".to_string());
585 assert_ne!(body1, body2);
586 }
587
588 #[test]
589 fn test_body_xml_not_equal_to_other_variants() {
590 let xml_body = Body::Xml("x".to_string());
592 let text_body = Body::Text("x".to_string());
593 assert_ne!(xml_body, text_body);
594 }
595
596 #[test]
597 fn test_try_into_xml_from_text() {
598 let body = Body::Text("<root/>".to_string());
599 let result = body.try_into_xml();
600 assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
601 }
602
603 #[test]
604 fn test_try_into_xml_invalid_text() {
605 let body = Body::Text("not xml".to_string());
606 let result = body.try_into_xml();
607 assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
608 }
609
610 #[test]
611 fn test_body_xml_clone() {
612 let original = Body::Xml("hello".to_string());
613 let cloned = original.clone();
614 assert_eq!(original, cloned);
615 }
616
617 #[tokio::test]
620 async fn test_into_async_read_empty() {
621 use tokio::io::AsyncReadExt;
622 let body = Body::Empty;
623 let mut reader = body.into_async_read();
624 let mut buf = Vec::new();
625 reader.read_to_end(&mut buf).await.unwrap();
626 assert!(buf.is_empty());
627 }
628
629 #[tokio::test]
630 async fn test_into_async_read_bytes() {
631 use tokio::io::AsyncReadExt;
632 let body = Body::Bytes(Bytes::from("hello"));
633 let mut reader = body.into_async_read();
634 let mut buf = Vec::new();
635 reader.read_to_end(&mut buf).await.unwrap();
636 assert_eq!(buf, b"hello");
637 }
638
639 #[tokio::test]
640 async fn test_into_async_read_text() {
641 use tokio::io::AsyncReadExt;
642 let body = Body::Text("world".to_string());
643 let mut reader = body.into_async_read();
644 let mut buf = Vec::new();
645 reader.read_to_end(&mut buf).await.unwrap();
646 assert_eq!(buf, b"world");
647 }
648
649 #[tokio::test]
650 async fn test_into_async_read_json() {
651 use tokio::io::AsyncReadExt;
652 let body = Body::Json(serde_json::json!({"key": "val"}));
653 let mut reader = body.into_async_read();
654 let mut buf = Vec::new();
655 reader.read_to_end(&mut buf).await.unwrap();
656 let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
657 assert_eq!(parsed["key"], "val");
658 }
659
660 #[tokio::test]
661 async fn test_into_async_read_xml() {
662 use tokio::io::AsyncReadExt;
663 let body = Body::Xml("<root/>".to_string());
664 let mut reader = body.into_async_read();
665 let mut buf = Vec::new();
666 reader.read_to_end(&mut buf).await.unwrap();
667 assert_eq!(buf, b"<root/>");
668 }
669
670 #[tokio::test]
671 async fn test_into_async_read_stream_multichunk() {
672 use tokio::io::AsyncReadExt;
673 let chunks: Vec<Result<Bytes, CamelError>> = vec![
674 Ok(Bytes::from("foo")),
675 Ok(Bytes::from("bar")),
676 Ok(Bytes::from("baz")),
677 ];
678 let stream = futures::stream::iter(chunks);
679 let body = Body::Stream(StreamBody {
680 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
681 metadata: StreamMetadata {
682 size_hint: None,
683 content_type: None,
684 origin: None,
685 },
686 });
687 let mut reader = body.into_async_read();
688 let mut buf = Vec::new();
689 reader.read_to_end(&mut buf).await.unwrap();
690 assert_eq!(buf, b"foobarbaz");
691 }
692
693 #[tokio::test]
694 async fn test_into_async_read_already_consumed() {
695 use tokio::io::AsyncReadExt;
696 type MaybeStream = Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>;
698 let arc: MaybeStream = Arc::new(Mutex::new(None));
699 let body = Body::Stream(StreamBody {
700 stream: arc,
701 metadata: StreamMetadata {
702 size_hint: None,
703 content_type: None,
704 origin: None,
705 },
706 });
707 let mut reader = body.into_async_read();
708 let mut buf = Vec::new();
709 let result = reader.read_to_end(&mut buf).await;
710 assert!(result.is_err());
711 }
712}