Skip to main content

camel_api/
body.rs

1use crate::error::CamelError;
2/// General-purpose default limit for [`Body::materialize()`] (10 MB).
3///
4/// This is separate from [`stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD`] (128 KB),
5/// which is the OOM-protection limit used by `StreamCacheService`.
6pub const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
7
8use bytes::{Bytes, BytesMut};
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use std::io;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context as TaskContext, Poll};
15use tokio::io::{AsyncRead, ReadBuf};
16use tokio::sync::Mutex;
17use tokio_util::io::StreamReader;
18
19/// A boxed [`AsyncRead`] for reading body content without materializing it into memory.
20///
21/// Returned by [`Body::into_async_read()`].
22pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
23
24/// Metadata associated with a stream body.
25#[derive(Debug, Clone, Default)]
26pub struct StreamMetadata {
27    /// Expected size of the stream if known.
28    pub size_hint: Option<u64>,
29    /// Content type of the stream content.
30    pub content_type: Option<String>,
31    /// Origin of the stream (e.g. "file:///path/to/file").
32    pub origin: Option<String>,
33}
34
35/// A body that wraps a lazy-evaluated stream of bytes.
36///
37/// # Clone Semantics
38///
39/// The stream is **single-consumption**. When cloning a `Body::Stream`,
40/// all clones share the same underlying stream handle. Only the first
41/// clone to consume the stream will succeed; subsequent attempts will
42/// return `CamelError::AlreadyConsumed`.
43///
44/// # Example
45///
46/// ```rust
47/// use camel_api::{Body, StreamBody, error::CamelError};
48/// use futures::stream;
49/// use bytes::Bytes;
50/// use std::sync::Arc;
51/// use tokio::sync::Mutex;
52///
53/// # #[tokio::main]
54/// # async fn main() -> Result<(), CamelError> {
55/// let chunks = vec![Ok(Bytes::from("data"))];
56/// let stream = stream::iter(chunks);
57/// let body = Body::Stream(StreamBody {
58///     stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
59///     metadata: Default::default(),
60/// });
61///
62/// let clone = body.clone();
63///
64/// // First consumption succeeds
65/// let _ = body.into_bytes(1024).await?;
66///
67/// // Second consumption fails
68/// let result = clone.into_bytes(1024).await;
69/// assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
70/// # Ok(())
71/// # }
72/// ```
73pub struct StreamBody {
74    /// The actual byte stream, wrapped in an Arc and Mutex to allow Clone for Body.
75    #[allow(clippy::type_complexity)]
76    pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
77    /// Metadata associated with the stream.
78    pub metadata: StreamMetadata,
79}
80
81impl std::fmt::Debug for StreamBody {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("StreamBody")
84            .field("metadata", &self.metadata)
85            .field("stream", &"<BoxStream>")
86            .finish()
87    }
88}
89
90impl Clone for StreamBody {
91    fn clone(&self) -> Self {
92        Self {
93            stream: Arc::clone(&self.stream),
94            metadata: self.metadata.clone(),
95        }
96    }
97}
98
99// ---------------------------------------------------------------------------
100// StreamAsyncRead — adapts Body::Stream into AsyncRead
101// ---------------------------------------------------------------------------
102
103/// Private adapter that implements [`AsyncRead`] for [`Body::Stream`].
104///
105/// On the first `poll_read`, attempts a non-blocking `try_lock()` on the inner
106/// `Arc<Mutex<Option<BoxStream>>>`:
107/// - If the lock succeeds and the stream is `Some`, extracts it and creates
108///   an active [`StreamReader`].
109/// - If the stream is `None` (already consumed), returns an [`io::Error`].
110/// - If the lock is contended (extremely rare), wakes the task and returns
111///   `Poll::Pending` to retry.
112#[allow(clippy::type_complexity)]
113struct StreamAsyncRead {
114    arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
115    /// Holds the active reader after the stream is extracted on first poll.
116    reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
117    /// Set to true when the stream was already consumed (prevents further reads).
118    consumed: bool,
119}
120
121impl AsyncRead for StreamAsyncRead {
122    fn poll_read(
123        mut self: Pin<&mut Self>,
124        cx: &mut TaskContext<'_>,
125        buf: &mut ReadBuf<'_>,
126    ) -> Poll<io::Result<()>> {
127        if self.consumed {
128            return Poll::Ready(Err(io::Error::other("stream already consumed")));
129        }
130        // Lazy init: extract the stream on first poll
131        if self.reader.is_none() {
132            // Extract stream in a separate scope to avoid holding the lock while modifying self
133            let extracted = {
134                match self.arc.try_lock() {
135                    Ok(mut guard) => guard.take(),
136                    Err(_) => {
137                        // Lock contended — schedule a retry
138                        cx.waker().wake_by_ref();
139                        return Poll::Pending;
140                    }
141                }
142            };
143            // Now safe to modify self since lock is dropped
144            match extracted {
145                Some(stream) => {
146                    let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
147                    self.reader = Some(Box::new(StreamReader::new(mapped)));
148                }
149                None => {
150                    self.consumed = true;
151                    return Poll::Ready(Err(io::Error::other("stream already consumed")));
152                }
153            }
154        }
155        // Delegate to the active reader
156        Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf)
157    }
158}
159
160/// The body of a message, supporting common payload types.
161#[derive(Debug, Default)]
162pub enum Body {
163    /// No body content.
164    #[default]
165    Empty,
166    /// Raw bytes payload.
167    Bytes(Bytes),
168    /// UTF-8 string payload.
169    Text(String),
170    /// JSON payload.
171    Json(serde_json::Value),
172    /// XML payload (well-formed XML string; use `try_into_xml()` for validation).
173    Xml(String),
174    /// Streaming payload.
175    Stream(StreamBody),
176}
177
178impl Clone for Body {
179    fn clone(&self) -> Self {
180        match self {
181            Body::Empty => Body::Empty,
182            Body::Bytes(b) => Body::Bytes(b.clone()),
183            Body::Text(s) => Body::Text(s.clone()),
184            Body::Json(v) => Body::Json(v.clone()),
185            Body::Xml(s) => Body::Xml(s.clone()),
186            Body::Stream(s) => Body::Stream(s.clone()),
187        }
188    }
189}
190
191impl PartialEq for Body {
192    fn eq(&self, other: &Self) -> bool {
193        match (self, other) {
194            (Body::Empty, Body::Empty) => true,
195            (Body::Text(a), Body::Text(b)) => a == b,
196            (Body::Json(a), Body::Json(b)) => a == b,
197            (Body::Bytes(a), Body::Bytes(b)) => a == b,
198            (Body::Xml(a), Body::Xml(b)) => a == b,
199            // Stream: two streams are never equal (single-consumption)
200            _ => false,
201        }
202    }
203}
204
205impl Body {
206    /// Returns `true` if the body is empty.
207    pub fn is_empty(&self) -> bool {
208        matches!(self, Body::Empty)
209    }
210
211    /// Convert the body into `Bytes`, consuming it if it is a stream.
212    /// This is an async operation because it may need to read from an underlying stream.
213    /// A `max_size` limit is enforced to prevent OOM errors.
214    pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
215        match self {
216            Body::Empty => Ok(Bytes::new()),
217            Body::Bytes(b) => {
218                if b.len() > max_size {
219                    return Err(CamelError::StreamLimitExceeded(max_size));
220                }
221                Ok(b)
222            }
223            Body::Text(s) => {
224                if s.len() > max_size {
225                    return Err(CamelError::StreamLimitExceeded(max_size));
226                }
227                Ok(Bytes::from(s))
228            }
229            Body::Json(v) => {
230                let b = serde_json::to_vec(&v)
231                    .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
232                if b.len() > max_size {
233                    return Err(CamelError::StreamLimitExceeded(max_size));
234                }
235                Ok(Bytes::from(b))
236            }
237            Body::Xml(s) => {
238                if s.len() > max_size {
239                    return Err(CamelError::StreamLimitExceeded(max_size));
240                }
241                Ok(Bytes::from(s))
242            }
243            Body::Stream(s) => {
244                let mut stream_lock = s.stream.lock().await;
245                let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
246
247                let mut buffer = BytesMut::new();
248                while let Some(chunk_res) = stream.next().await {
249                    let chunk = chunk_res?;
250                    if buffer.len() + chunk.len() > max_size {
251                        return Err(CamelError::StreamLimitExceeded(max_size));
252                    }
253                    buffer.extend_from_slice(&chunk);
254                }
255                Ok(buffer.freeze())
256            }
257        }
258    }
259
260    /// Materialize stream with sensible default limit (10 MB).
261    ///
262    /// Convenience method for common cases where you need the stream content
263    /// but don't want to specify a custom limit. For the tighter stream-cache
264    /// threshold (128 KB), use [`stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD`]
265    /// with [`Body::into_bytes()`] instead.
266    ///
267    /// # Example
268    /// ```ignore
269    /// let body = Body::Stream(stream);
270    /// let bytes = body.materialize().await?;
271    /// ```
272    pub async fn materialize(self) -> Result<Bytes, CamelError> {
273        self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
274    }
275
276    /// Convert the body into an [`AsyncRead`] without materializing it into memory.
277    ///
278    /// - [`Body::Empty`] → empty reader (0 bytes)
279    /// - [`Body::Bytes`] → in-memory cursor
280    /// - [`Body::Text`] → UTF-8 bytes cursor
281    /// - [`Body::Json`] → serialized JSON bytes cursor
282    /// - [`Body::Xml`] → UTF-8 bytes cursor
283    /// - [`Body::Stream`] → streams chunk-by-chunk via [`StreamReader`];
284    ///   if the stream was already consumed, the reader returns an [`io::Error`]
285    ///   on the first read
286    pub fn into_async_read(self) -> BoxAsyncRead {
287        match self {
288            Body::Empty => Box::pin(tokio::io::empty()),
289            Body::Bytes(b) => Box::pin(std::io::Cursor::new(b)),
290            Body::Text(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
291            Body::Json(v) => match serde_json::to_vec(&v) {
292                Ok(bytes) => Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead,
293                Err(e) => {
294                    let err = io::Error::new(io::ErrorKind::InvalidData, e.to_string());
295                    let stream = futures::stream::iter(vec![Err::<Bytes, io::Error>(err)]);
296                    Box::pin(StreamReader::new(stream)) as BoxAsyncRead
297                }
298            },
299            Body::Xml(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
300            Body::Stream(s) => Box::pin(StreamAsyncRead {
301                arc: s.stream,
302                reader: None,
303                consumed: false,
304            }),
305        }
306    }
307
308    /// Try to get the body as a string, converting from bytes if needed.
309    pub fn as_text(&self) -> Option<&str> {
310        match self {
311            Body::Text(s) => Some(s.as_str()),
312            _ => None,
313        }
314    }
315
316    /// Try to get the body as an XML string.
317    pub fn as_xml(&self) -> Option<&str> {
318        match self {
319            Body::Xml(s) => Some(s.as_str()),
320            _ => None,
321        }
322    }
323
324    /// Convert this body to `Body::Text`, consuming it.
325    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
326    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
327    pub fn try_into_text(self) -> Result<Body, CamelError> {
328        crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
329    }
330
331    /// Convert this body to `Body::Json`, consuming it.
332    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
333    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
334    pub fn try_into_json(self) -> Result<Body, CamelError> {
335        crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
336    }
337
338    /// Convert this body to `Body::Bytes`, consuming it.
339    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
340    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
341    pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
342        crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
343    }
344
345    /// Convert this body to `Body::Xml`, consuming it.
346    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
347    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
348    pub fn try_into_xml(self) -> Result<Body, CamelError> {
349        crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
350    }
351}
352
353// Conversion impls
354impl From<String> for Body {
355    fn from(s: String) -> Self {
356        Body::Text(s)
357    }
358}
359
360impl From<&str> for Body {
361    fn from(s: &str) -> Self {
362        Body::Text(s.to_string())
363    }
364}
365
366impl From<Bytes> for Body {
367    fn from(b: Bytes) -> Self {
368        Body::Bytes(b)
369    }
370}
371
372impl From<Vec<u8>> for Body {
373    fn from(v: Vec<u8>) -> Self {
374        Body::Bytes(Bytes::from(v))
375    }
376}
377
378impl From<serde_json::Value> for Body {
379    fn from(v: serde_json::Value) -> Self {
380        Body::Json(v)
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn test_body_default_is_empty() {
390        let body = Body::default();
391        assert!(body.is_empty());
392    }
393
394    #[test]
395    fn test_body_from_string() {
396        let body = Body::from("hello".to_string());
397        assert_eq!(body.as_text(), Some("hello"));
398    }
399
400    #[test]
401    fn test_body_from_str() {
402        let body = Body::from("world");
403        assert_eq!(body.as_text(), Some("world"));
404    }
405
406    #[test]
407    fn test_body_from_bytes() {
408        let body = Body::from(Bytes::from_static(b"data"));
409        assert!(!body.is_empty());
410        assert!(matches!(body, Body::Bytes(_)));
411    }
412
413    #[test]
414    fn test_body_from_json() {
415        let val = serde_json::json!({"key": "value"});
416        let body = Body::from(val.clone());
417        assert!(matches!(body, Body::Json(_)));
418    }
419
420    #[tokio::test]
421    async fn test_into_bytes_from_stream() {
422        use futures::stream;
423        let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
424        let stream = stream::iter(chunks);
425        let body = Body::Stream(StreamBody {
426            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
427            metadata: StreamMetadata::default(),
428        });
429
430        let result = body.into_bytes(100).await.unwrap();
431        assert_eq!(result, Bytes::from("hello world"));
432    }
433
434    #[tokio::test]
435    async fn test_into_bytes_limit_exceeded() {
436        use futures::stream;
437        let chunks = vec![Ok(Bytes::from("this is too long"))];
438        let stream = stream::iter(chunks);
439        let body = Body::Stream(StreamBody {
440            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
441            metadata: StreamMetadata::default(),
442        });
443
444        let result = body.into_bytes(5).await;
445        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
446    }
447
448    #[tokio::test]
449    async fn test_into_bytes_already_consumed() {
450        use futures::stream;
451        let chunks = vec![Ok(Bytes::from("data"))];
452        let stream = stream::iter(chunks);
453        let body = Body::Stream(StreamBody {
454            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
455            metadata: StreamMetadata::default(),
456        });
457
458        let cloned = body.clone();
459        let _ = body.into_bytes(100).await.unwrap();
460
461        let result = cloned.into_bytes(100).await;
462        assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
463    }
464
465    #[tokio::test]
466    async fn test_materialize_with_default_limit() {
467        use futures::stream;
468
469        // Small stream under limit - should succeed with default 10MB limit
470        let chunks = vec![Ok(Bytes::from("test data"))];
471        let stream = stream::iter(chunks);
472        let body = Body::Stream(StreamBody {
473            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
474            metadata: StreamMetadata::default(),
475        });
476
477        let result = body.materialize().await;
478        assert!(result.is_ok());
479        assert_eq!(result.unwrap(), Bytes::from("test data"));
480    }
481
482    #[tokio::test]
483    async fn test_materialize_non_stream_body_types() {
484        // Verify materialize() works with all body types, not just streams
485
486        // Body::Empty
487        let body = Body::Empty;
488        let result = body.materialize().await.unwrap();
489        assert!(result.is_empty());
490
491        // Body::Bytes
492        let body = Body::Bytes(Bytes::from("bytes data"));
493        let result = body.materialize().await.unwrap();
494        assert_eq!(result, Bytes::from("bytes data"));
495
496        // Body::Text
497        let body = Body::Text("text data".to_string());
498        let result = body.materialize().await.unwrap();
499        assert_eq!(result, Bytes::from("text data"));
500
501        // Body::Json
502        let body = Body::Json(serde_json::json!({"key": "value"}));
503        let result = body.materialize().await.unwrap();
504        assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
505
506        // Body::Xml
507        let xml = "<root><child>value</child></root>";
508        let body = Body::Xml(xml.to_string());
509        let result = body.materialize().await.unwrap();
510        assert_eq!(result, Bytes::from(xml));
511    }
512
513    #[tokio::test]
514    async fn test_materialize_exceeds_default_limit() {
515        use futures::stream;
516
517        // 11MB stream - should fail with default 10MB limit
518        let large_data = vec![0u8; 11 * 1024 * 1024];
519        let chunks = vec![Ok(Bytes::from(large_data))];
520        let stream = stream::iter(chunks);
521        let body = Body::Stream(StreamBody {
522            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
523            metadata: StreamMetadata::default(),
524        });
525
526        let result = body.materialize().await;
527        assert!(matches!(
528            result,
529            Err(CamelError::StreamLimitExceeded(10_485_760))
530        ));
531    }
532
533    #[test]
534    fn stream_variants_are_never_equal() {
535        use futures::stream;
536
537        let make_stream = || {
538            let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
539            Body::Stream(StreamBody {
540                stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
541                metadata: StreamMetadata::default(),
542            })
543        };
544        assert_ne!(make_stream(), make_stream());
545    }
546
547    // XML body tests
548
549    #[test]
550    fn test_body_xml_as_xml() {
551        let xml = "<root><child>value</child></root>";
552        let body = Body::Xml(xml.to_string());
553        assert_eq!(body.as_xml(), Some(xml));
554    }
555
556    #[test]
557    fn test_body_non_xml_as_xml_returns_none() {
558        // Body::Text should return None for as_xml()
559        let body = Body::Text("<root/>".to_string());
560        assert_eq!(body.as_xml(), None);
561
562        // Body::Empty should return None
563        let body = Body::Empty;
564        assert_eq!(body.as_xml(), None);
565
566        // Body::Bytes should return None
567        let body = Body::Bytes(Bytes::from("<root/>"));
568        assert_eq!(body.as_xml(), None);
569
570        // Body::Json should return None
571        let body = Body::Json(serde_json::json!({"key": "value"}));
572        assert_eq!(body.as_xml(), None);
573    }
574
575    #[test]
576    fn test_body_xml_partial_eq() {
577        // Same XML content should be equal
578        let body1 = Body::Xml("a".to_string());
579        let body2 = Body::Xml("a".to_string());
580        assert_eq!(body1, body2);
581
582        // Different XML content should not be equal
583        let body1 = Body::Xml("a".to_string());
584        let body2 = Body::Xml("b".to_string());
585        assert_ne!(body1, body2);
586    }
587
588    #[test]
589    fn test_body_xml_not_equal_to_other_variants() {
590        // Body::Xml should not equal Body::Text even with same content
591        let xml_body = Body::Xml("x".to_string());
592        let text_body = Body::Text("x".to_string());
593        assert_ne!(xml_body, text_body);
594    }
595
596    #[test]
597    fn test_try_into_xml_from_text() {
598        let body = Body::Text("<root/>".to_string());
599        let result = body.try_into_xml();
600        assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
601    }
602
603    #[test]
604    fn test_try_into_xml_invalid_text() {
605        let body = Body::Text("not xml".to_string());
606        let result = body.try_into_xml();
607        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
608    }
609
610    #[test]
611    fn test_body_xml_clone() {
612        let original = Body::Xml("hello".to_string());
613        let cloned = original.clone();
614        assert_eq!(original, cloned);
615    }
616
617    // ---------- into_async_read tests ----------
618
619    #[tokio::test]
620    async fn test_into_async_read_empty() {
621        use tokio::io::AsyncReadExt;
622        let body = Body::Empty;
623        let mut reader = body.into_async_read();
624        let mut buf = Vec::new();
625        reader.read_to_end(&mut buf).await.unwrap();
626        assert!(buf.is_empty());
627    }
628
629    #[tokio::test]
630    async fn test_into_async_read_bytes() {
631        use tokio::io::AsyncReadExt;
632        let body = Body::Bytes(Bytes::from("hello"));
633        let mut reader = body.into_async_read();
634        let mut buf = Vec::new();
635        reader.read_to_end(&mut buf).await.unwrap();
636        assert_eq!(buf, b"hello");
637    }
638
639    #[tokio::test]
640    async fn test_into_async_read_text() {
641        use tokio::io::AsyncReadExt;
642        let body = Body::Text("world".to_string());
643        let mut reader = body.into_async_read();
644        let mut buf = Vec::new();
645        reader.read_to_end(&mut buf).await.unwrap();
646        assert_eq!(buf, b"world");
647    }
648
649    #[tokio::test]
650    async fn test_into_async_read_json() {
651        use tokio::io::AsyncReadExt;
652        let body = Body::Json(serde_json::json!({"key": "val"}));
653        let mut reader = body.into_async_read();
654        let mut buf = Vec::new();
655        reader.read_to_end(&mut buf).await.unwrap();
656        let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
657        assert_eq!(parsed["key"], "val");
658    }
659
660    #[tokio::test]
661    async fn test_into_async_read_xml() {
662        use tokio::io::AsyncReadExt;
663        let body = Body::Xml("<root/>".to_string());
664        let mut reader = body.into_async_read();
665        let mut buf = Vec::new();
666        reader.read_to_end(&mut buf).await.unwrap();
667        assert_eq!(buf, b"<root/>");
668    }
669
670    #[tokio::test]
671    async fn test_into_async_read_stream_multichunk() {
672        use tokio::io::AsyncReadExt;
673        let chunks: Vec<Result<Bytes, CamelError>> = vec![
674            Ok(Bytes::from("foo")),
675            Ok(Bytes::from("bar")),
676            Ok(Bytes::from("baz")),
677        ];
678        let stream = futures::stream::iter(chunks);
679        let body = Body::Stream(StreamBody {
680            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
681            metadata: StreamMetadata {
682                size_hint: None,
683                content_type: None,
684                origin: None,
685            },
686        });
687        let mut reader = body.into_async_read();
688        let mut buf = Vec::new();
689        reader.read_to_end(&mut buf).await.unwrap();
690        assert_eq!(buf, b"foobarbaz");
691    }
692
693    #[tokio::test]
694    async fn test_into_async_read_already_consumed() {
695        use tokio::io::AsyncReadExt;
696        // Mutex holds None → stream already consumed
697        type MaybeStream = Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>;
698        let arc: MaybeStream = Arc::new(Mutex::new(None));
699        let body = Body::Stream(StreamBody {
700            stream: arc,
701            metadata: StreamMetadata {
702                size_hint: None,
703                content_type: None,
704                origin: None,
705            },
706        });
707        let mut reader = body.into_async_read();
708        let mut buf = Vec::new();
709        let result = reader.read_to_end(&mut buf).await;
710        assert!(result.is_err());
711    }
712}