1use crate::error::CamelError;
2pub const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
7
8use bytes::{Bytes, BytesMut};
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use std::io;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context as TaskContext, Poll};
15use tokio::io::{AsyncRead, ReadBuf};
16use tokio::sync::Mutex;
17use tokio_util::io::StreamReader;
18
19pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
23
24#[derive(Debug, Clone, Default)]
26pub struct StreamMetadata {
27 pub size_hint: Option<u64>,
29 pub content_type: Option<String>,
31 pub origin: Option<String>,
33}
34
35pub struct StreamBody {
74 #[allow(clippy::type_complexity)]
76 pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
77 pub metadata: StreamMetadata,
79}
80
81impl std::fmt::Debug for StreamBody {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 f.debug_struct("StreamBody")
84 .field("metadata", &self.metadata)
85 .field("stream", &"<BoxStream>")
86 .finish()
87 }
88}
89
90impl Clone for StreamBody {
91 fn clone(&self) -> Self {
92 Self {
93 stream: Arc::clone(&self.stream),
94 metadata: self.metadata.clone(),
95 }
96 }
97}
98
99#[allow(clippy::type_complexity)]
113struct StreamAsyncRead {
114 arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
115 reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
117 consumed: bool,
119}
120
121impl AsyncRead for StreamAsyncRead {
122 fn poll_read(
123 mut self: Pin<&mut Self>,
124 cx: &mut TaskContext<'_>,
125 buf: &mut ReadBuf<'_>,
126 ) -> Poll<io::Result<()>> {
127 if self.consumed {
128 return Poll::Ready(Err(io::Error::other("stream already consumed")));
129 }
130 if self.reader.is_none() {
132 let extracted = {
134 match self.arc.try_lock() {
135 Ok(mut guard) => guard.take(),
136 Err(_) => {
137 cx.waker().wake_by_ref();
139 return Poll::Pending;
140 }
141 }
142 };
143 match extracted {
145 Some(stream) => {
146 let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
147 self.reader = Some(Box::new(StreamReader::new(mapped)));
148 }
149 None => {
150 self.consumed = true;
151 return Poll::Ready(Err(io::Error::other("stream already consumed")));
152 }
153 }
154 }
155 Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf) }
158}
159
160#[derive(Debug, Default)]
162pub enum Body {
163 #[default]
165 Empty,
166 Bytes(Bytes),
168 Text(String),
170 Json(serde_json::Value),
172 Xml(String),
174 Stream(StreamBody),
176}
177
178impl Clone for Body {
179 fn clone(&self) -> Self {
180 match self {
181 Body::Empty => Body::Empty,
182 Body::Bytes(b) => Body::Bytes(b.clone()),
183 Body::Text(s) => Body::Text(s.clone()),
184 Body::Json(v) => Body::Json(v.clone()),
185 Body::Xml(s) => Body::Xml(s.clone()),
186 Body::Stream(s) => Body::Stream(s.clone()),
187 }
188 }
189}
190
191impl PartialEq for Body {
192 fn eq(&self, other: &Self) -> bool {
193 match (self, other) {
194 (Body::Empty, Body::Empty) => true,
195 (Body::Text(a), Body::Text(b)) => a == b,
196 (Body::Json(a), Body::Json(b)) => a == b,
197 (Body::Bytes(a), Body::Bytes(b)) => a == b,
198 (Body::Xml(a), Body::Xml(b)) => a == b,
199 _ => false,
201 }
202 }
203}
204
205impl Body {
206 pub fn is_empty(&self) -> bool {
208 matches!(self, Body::Empty)
209 }
210
211 pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
215 match self {
216 Body::Empty => Ok(Bytes::new()),
217 Body::Bytes(b) => {
218 if b.len() > max_size {
219 return Err(CamelError::StreamLimitExceeded(max_size));
220 }
221 Ok(b)
222 }
223 Body::Text(s) => {
224 if s.len() > max_size {
225 return Err(CamelError::StreamLimitExceeded(max_size));
226 }
227 Ok(Bytes::from(s))
228 }
229 Body::Json(v) => {
230 let b = serde_json::to_vec(&v)
231 .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
232 if b.len() > max_size {
233 return Err(CamelError::StreamLimitExceeded(max_size));
234 }
235 Ok(Bytes::from(b))
236 }
237 Body::Xml(s) => {
238 if s.len() > max_size {
239 return Err(CamelError::StreamLimitExceeded(max_size));
240 }
241 Ok(Bytes::from(s))
242 }
243 Body::Stream(s) => {
244 let mut stream_lock = s.stream.lock().await;
245 let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
246
247 let mut buffer = BytesMut::new();
248 while let Some(chunk_res) = stream.next().await {
249 let chunk = chunk_res?;
250 if buffer.len() + chunk.len() > max_size {
251 return Err(CamelError::StreamLimitExceeded(max_size));
252 }
253 buffer.extend_from_slice(&chunk);
254 }
255 Ok(buffer.freeze())
256 }
257 }
258 }
259
260 pub async fn materialize(self) -> Result<Bytes, CamelError> {
273 self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
274 }
275
276 pub fn into_async_read(self) -> Result<BoxAsyncRead, CamelError> {
287 match self {
288 Body::Empty => Ok(Box::pin(tokio::io::empty())),
289 Body::Bytes(b) => Ok(Box::pin(std::io::Cursor::new(b))),
290 Body::Text(s) => Ok(Box::pin(std::io::Cursor::new(s.into_bytes()))),
291 Body::Json(v) => {
292 let bytes = serde_json::to_vec(&v)
293 .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
294 Ok(Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead)
295 }
296 Body::Xml(s) => Ok(Box::pin(std::io::Cursor::new(s.into_bytes()))),
297 Body::Stream(s) => Ok(Box::pin(StreamAsyncRead {
298 arc: s.stream,
299 reader: None,
300 consumed: false,
301 })),
302 }
303 }
304
305 pub fn as_text(&self) -> Option<&str> {
307 match self {
308 Body::Text(s) => Some(s.as_str()),
309 _ => None,
310 }
311 }
312
313 pub fn as_xml(&self) -> Option<&str> {
315 match self {
316 Body::Xml(s) => Some(s.as_str()),
317 _ => None,
318 }
319 }
320
321 pub fn try_into_text(self) -> Result<Body, CamelError> {
325 crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
326 }
327
328 pub fn try_into_json(self) -> Result<Body, CamelError> {
332 crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
333 }
334
335 pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
339 crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
340 }
341
342 pub fn try_into_xml(self) -> Result<Body, CamelError> {
346 crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
347 }
348}
349
350impl From<String> for Body {
352 fn from(s: String) -> Self {
353 Body::Text(s)
354 }
355}
356
357impl From<&str> for Body {
358 fn from(s: &str) -> Self {
359 Body::Text(s.to_string())
360 }
361}
362
363impl From<Bytes> for Body {
364 fn from(b: Bytes) -> Self {
365 Body::Bytes(b)
366 }
367}
368
369impl From<Vec<u8>> for Body {
370 fn from(v: Vec<u8>) -> Self {
371 Body::Bytes(Bytes::from(v))
372 }
373}
374
375impl From<serde_json::Value> for Body {
376 fn from(v: serde_json::Value) -> Self {
377 Body::Json(v)
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384
385 #[test]
386 fn test_body_default_is_empty() {
387 let body = Body::default();
388 assert!(body.is_empty());
389 }
390
391 #[test]
392 fn test_body_from_string() {
393 let body = Body::from("hello".to_string());
394 assert_eq!(body.as_text(), Some("hello"));
395 }
396
397 #[test]
398 fn test_body_from_str() {
399 let body = Body::from("world");
400 assert_eq!(body.as_text(), Some("world"));
401 }
402
403 #[test]
404 fn test_body_from_bytes() {
405 let body = Body::from(Bytes::from_static(b"data"));
406 assert!(!body.is_empty());
407 assert!(matches!(body, Body::Bytes(_)));
408 }
409
410 #[test]
411 fn test_body_from_json() {
412 let val = serde_json::json!({"key": "value"});
413 let body = Body::from(val.clone());
414 assert!(matches!(body, Body::Json(_)));
415 }
416
417 #[tokio::test]
418 async fn test_into_bytes_from_stream() {
419 use futures::stream;
420 let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
421 let stream = stream::iter(chunks);
422 let body = Body::Stream(StreamBody {
423 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
424 metadata: StreamMetadata::default(),
425 });
426
427 let result = body.into_bytes(100).await.unwrap();
428 assert_eq!(result, Bytes::from("hello world"));
429 }
430
431 #[tokio::test]
432 async fn test_into_bytes_limit_exceeded() {
433 use futures::stream;
434 let chunks = vec![Ok(Bytes::from("this is too long"))];
435 let stream = stream::iter(chunks);
436 let body = Body::Stream(StreamBody {
437 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
438 metadata: StreamMetadata::default(),
439 });
440
441 let result = body.into_bytes(5).await;
442 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
443 }
444
445 #[tokio::test]
446 async fn test_into_bytes_already_consumed() {
447 use futures::stream;
448 let chunks = vec![Ok(Bytes::from("data"))];
449 let stream = stream::iter(chunks);
450 let body = Body::Stream(StreamBody {
451 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
452 metadata: StreamMetadata::default(),
453 });
454
455 let cloned = body.clone();
456 let _ = body.into_bytes(100).await.unwrap();
457
458 let result = cloned.into_bytes(100).await;
459 assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
460 }
461
462 #[tokio::test]
463 async fn test_materialize_with_default_limit() {
464 use futures::stream;
465
466 let chunks = vec![Ok(Bytes::from("test data"))];
468 let stream = stream::iter(chunks);
469 let body = Body::Stream(StreamBody {
470 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
471 metadata: StreamMetadata::default(),
472 });
473
474 let result = body.materialize().await;
475 assert!(result.is_ok());
476 assert_eq!(result.unwrap(), Bytes::from("test data"));
477 }
478
479 #[tokio::test]
480 async fn test_materialize_non_stream_body_types() {
481 let body = Body::Empty;
485 let result = body.materialize().await.unwrap();
486 assert!(result.is_empty());
487
488 let body = Body::Bytes(Bytes::from("bytes data"));
490 let result = body.materialize().await.unwrap();
491 assert_eq!(result, Bytes::from("bytes data"));
492
493 let body = Body::Text("text data".to_string());
495 let result = body.materialize().await.unwrap();
496 assert_eq!(result, Bytes::from("text data"));
497
498 let body = Body::Json(serde_json::json!({"key": "value"}));
500 let result = body.materialize().await.unwrap();
501 assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
502
503 let xml = "<root><child>value</child></root>";
505 let body = Body::Xml(xml.to_string());
506 let result = body.materialize().await.unwrap();
507 assert_eq!(result, Bytes::from(xml));
508 }
509
510 #[tokio::test]
511 async fn test_materialize_exceeds_default_limit() {
512 use futures::stream;
513
514 let large_data = vec![0u8; 11 * 1024 * 1024];
516 let chunks = vec![Ok(Bytes::from(large_data))];
517 let stream = stream::iter(chunks);
518 let body = Body::Stream(StreamBody {
519 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
520 metadata: StreamMetadata::default(),
521 });
522
523 let result = body.materialize().await;
524 assert!(matches!(
525 result,
526 Err(CamelError::StreamLimitExceeded(10_485_760))
527 ));
528 }
529
530 #[test]
531 fn stream_variants_are_never_equal() {
532 use futures::stream;
533
534 let make_stream = || {
535 let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
536 Body::Stream(StreamBody {
537 stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
538 metadata: StreamMetadata::default(),
539 })
540 };
541 assert_ne!(make_stream(), make_stream());
542 }
543
544 #[test]
547 fn test_body_xml_as_xml() {
548 let xml = "<root><child>value</child></root>";
549 let body = Body::Xml(xml.to_string());
550 assert_eq!(body.as_xml(), Some(xml));
551 }
552
553 #[test]
554 fn test_body_non_xml_as_xml_returns_none() {
555 let body = Body::Text("<root/>".to_string());
557 assert_eq!(body.as_xml(), None);
558
559 let body = Body::Empty;
561 assert_eq!(body.as_xml(), None);
562
563 let body = Body::Bytes(Bytes::from("<root/>"));
565 assert_eq!(body.as_xml(), None);
566
567 let body = Body::Json(serde_json::json!({"key": "value"}));
569 assert_eq!(body.as_xml(), None);
570 }
571
572 #[test]
573 fn test_body_xml_partial_eq() {
574 let body1 = Body::Xml("a".to_string());
576 let body2 = Body::Xml("a".to_string());
577 assert_eq!(body1, body2);
578
579 let body1 = Body::Xml("a".to_string());
581 let body2 = Body::Xml("b".to_string());
582 assert_ne!(body1, body2);
583 }
584
585 #[test]
586 fn test_body_xml_not_equal_to_other_variants() {
587 let xml_body = Body::Xml("x".to_string());
589 let text_body = Body::Text("x".to_string());
590 assert_ne!(xml_body, text_body);
591 }
592
593 #[test]
594 fn test_try_into_xml_from_text() {
595 let body = Body::Text("<root/>".to_string());
596 let result = body.try_into_xml();
597 assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
598 }
599
600 #[test]
601 fn test_try_into_xml_invalid_text() {
602 let body = Body::Text("not xml".to_string());
603 let result = body.try_into_xml();
604 assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
605 }
606
607 #[test]
608 fn test_body_xml_clone() {
609 let original = Body::Xml("hello".to_string());
610 let cloned = original.clone();
611 assert_eq!(original, cloned);
612 }
613
614 #[tokio::test]
617 async fn test_into_async_read_empty() {
618 use tokio::io::AsyncReadExt;
619 let body = Body::Empty;
620 let mut reader = body.into_async_read().unwrap();
621 let mut buf = Vec::new();
622 reader.read_to_end(&mut buf).await.unwrap();
623 assert!(buf.is_empty());
624 }
625
626 #[tokio::test]
627 async fn test_into_async_read_bytes() {
628 use tokio::io::AsyncReadExt;
629 let body = Body::Bytes(Bytes::from("hello"));
630 let mut reader = body.into_async_read().unwrap();
631 let mut buf = Vec::new();
632 reader.read_to_end(&mut buf).await.unwrap();
633 assert_eq!(buf, b"hello");
634 }
635
636 #[tokio::test]
637 async fn test_into_async_read_text() {
638 use tokio::io::AsyncReadExt;
639 let body = Body::Text("world".to_string());
640 let mut reader = body.into_async_read().unwrap();
641 let mut buf = Vec::new();
642 reader.read_to_end(&mut buf).await.unwrap();
643 assert_eq!(buf, b"world");
644 }
645
646 #[tokio::test]
647 async fn test_into_async_read_json() {
648 use tokio::io::AsyncReadExt;
649 let body = Body::Json(serde_json::json!({"key": "val"}));
650 let mut reader = body.into_async_read().unwrap();
651 let mut buf = Vec::new();
652 reader.read_to_end(&mut buf).await.unwrap();
653 let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
654 assert_eq!(parsed["key"], "val");
655 }
656
657 #[tokio::test]
658 async fn test_into_async_read_xml() {
659 use tokio::io::AsyncReadExt;
660 let body = Body::Xml("<root/>".to_string());
661 let mut reader = body.into_async_read().unwrap();
662 let mut buf = Vec::new();
663 reader.read_to_end(&mut buf).await.unwrap();
664 assert_eq!(buf, b"<root/>");
665 }
666
667 #[tokio::test]
668 async fn test_into_async_read_stream_multichunk() {
669 use tokio::io::AsyncReadExt;
670 let chunks: Vec<Result<Bytes, CamelError>> = vec![
671 Ok(Bytes::from("foo")),
672 Ok(Bytes::from("bar")),
673 Ok(Bytes::from("baz")),
674 ];
675 let stream = futures::stream::iter(chunks);
676 let body = Body::Stream(StreamBody {
677 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
678 metadata: StreamMetadata {
679 size_hint: None,
680 content_type: None,
681 origin: None,
682 },
683 });
684 let mut reader = body.into_async_read().unwrap();
685 let mut buf = Vec::new();
686 reader.read_to_end(&mut buf).await.unwrap();
687 assert_eq!(buf, b"foobarbaz");
688 }
689
690 #[tokio::test]
691 async fn test_into_async_read_already_consumed() {
692 use tokio::io::AsyncReadExt;
693 type MaybeStream = Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>;
695 let arc: MaybeStream = Arc::new(Mutex::new(None));
696 let body = Body::Stream(StreamBody {
697 stream: arc,
698 metadata: StreamMetadata {
699 size_hint: None,
700 content_type: None,
701 origin: None,
702 },
703 });
704 let mut reader = body.into_async_read().unwrap();
705 let mut buf = Vec::new();
706 let result = reader.read_to_end(&mut buf).await;
707 assert!(result.is_err());
708 }
709}