Skip to main content

camel_api/
body.rs

1use crate::error::CamelError;
2/// General-purpose default limit for [`Body::materialize()`] (10 MB).
3///
4/// This is separate from [`stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD`] (128 KB),
5/// which is the OOM-protection limit used by `StreamCacheService`.
6pub const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
7
8use bytes::{Bytes, BytesMut};
9use futures::stream::BoxStream;
10use futures::{StreamExt, TryStreamExt};
11use std::io;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context as TaskContext, Poll};
15use tokio::io::{AsyncRead, ReadBuf};
16use tokio::sync::Mutex;
17use tokio_util::io::StreamReader;
18
19/// A boxed [`AsyncRead`] for reading body content without materializing it into memory.
20///
21/// Returned by [`Body::into_async_read()`].
22pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
23
24/// Metadata associated with a stream body.
25#[derive(Debug, Clone, Default)]
26pub struct StreamMetadata {
27    /// Expected size of the stream if known.
28    pub size_hint: Option<u64>,
29    /// Content type of the stream content.
30    pub content_type: Option<String>,
31    /// Origin of the stream (e.g. "file:///path/to/file").
32    pub origin: Option<String>,
33}
34
35/// A body that wraps a lazy-evaluated stream of bytes.
36///
37/// # Clone Semantics
38///
39/// The stream is **single-consumption**. When cloning a `Body::Stream`,
40/// all clones share the same underlying stream handle. Only the first
41/// clone to consume the stream will succeed; subsequent attempts will
42/// return `CamelError::AlreadyConsumed`.
43///
44/// # Example
45///
46/// ```rust
47/// use camel_api::{Body, StreamBody, error::CamelError};
48/// use futures::stream;
49/// use bytes::Bytes;
50/// use std::sync::Arc;
51/// use tokio::sync::Mutex;
52///
53/// # #[tokio::main]
54/// # async fn main() -> Result<(), CamelError> {
55/// let chunks = vec![Ok(Bytes::from("data"))];
56/// let stream = stream::iter(chunks);
57/// let body = Body::Stream(StreamBody {
58///     stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
59///     metadata: Default::default(),
60/// });
61///
62/// let clone = body.clone();
63///
64/// // First consumption succeeds
65/// let _ = body.into_bytes(1024).await?;
66///
67/// // Second consumption fails
68/// let result = clone.into_bytes(1024).await;
69/// assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
70/// # Ok(())
71/// # }
72/// ```
73pub struct StreamBody {
74    /// The actual byte stream, wrapped in an Arc and Mutex to allow Clone for Body.
75    #[allow(clippy::type_complexity)]
76    pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
77    /// Metadata associated with the stream.
78    pub metadata: StreamMetadata,
79}
80
81impl std::fmt::Debug for StreamBody {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        f.debug_struct("StreamBody")
84            .field("metadata", &self.metadata)
85            .field("stream", &"<BoxStream>")
86            .finish()
87    }
88}
89
90impl Clone for StreamBody {
91    fn clone(&self) -> Self {
92        Self {
93            stream: Arc::clone(&self.stream),
94            metadata: self.metadata.clone(),
95        }
96    }
97}
98
99// ---------------------------------------------------------------------------
100// StreamAsyncRead — adapts Body::Stream into AsyncRead
101// ---------------------------------------------------------------------------
102
103/// Private adapter that implements [`AsyncRead`] for [`Body::Stream`].
104///
105/// On the first `poll_read`, attempts a non-blocking `try_lock()` on the inner
106/// `Arc<Mutex<Option<BoxStream>>>`:
107/// - If the lock succeeds and the stream is `Some`, extracts it and creates
108///   an active [`StreamReader`].
109/// - If the stream is `None` (already consumed), returns an [`io::Error`].
110/// - If the lock is contended (extremely rare), wakes the task and returns
111///   `Poll::Pending` to retry.
112#[allow(clippy::type_complexity)]
113struct StreamAsyncRead {
114    arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
115    /// Holds the active reader after the stream is extracted on first poll.
116    reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
117    /// Set to true when the stream was already consumed (prevents further reads).
118    consumed: bool,
119}
120
121impl AsyncRead for StreamAsyncRead {
122    fn poll_read(
123        mut self: Pin<&mut Self>,
124        cx: &mut TaskContext<'_>,
125        buf: &mut ReadBuf<'_>,
126    ) -> Poll<io::Result<()>> {
127        if self.consumed {
128            return Poll::Ready(Err(io::Error::other("stream already consumed")));
129        }
130        // Lazy init: extract the stream on first poll
131        if self.reader.is_none() {
132            // Extract stream in a separate scope to avoid holding the lock while modifying self
133            let extracted = {
134                match self.arc.try_lock() {
135                    Ok(mut guard) => guard.take(),
136                    Err(_) => {
137                        // Lock contended — schedule a retry
138                        cx.waker().wake_by_ref();
139                        return Poll::Pending;
140                    }
141                }
142            };
143            // Now safe to modify self since lock is dropped
144            match extracted {
145                Some(stream) => {
146                    let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
147                    self.reader = Some(Box::new(StreamReader::new(mapped)));
148                }
149                None => {
150                    self.consumed = true;
151                    return Poll::Ready(Err(io::Error::other("stream already consumed")));
152                }
153            }
154        }
155        // Delegate to the active reader
156        Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf) // allow-unwrap
157    }
158}
159
160/// The body of a message, supporting common payload types.
161#[derive(Debug, Default)]
162pub enum Body {
163    /// No body content.
164    #[default]
165    Empty,
166    /// Raw bytes payload.
167    Bytes(Bytes),
168    /// UTF-8 string payload.
169    Text(String),
170    /// JSON payload.
171    Json(serde_json::Value),
172    /// XML payload (well-formed XML string; use `try_into_xml()` for validation).
173    Xml(String),
174    /// Streaming payload.
175    Stream(StreamBody),
176}
177
178impl Clone for Body {
179    fn clone(&self) -> Self {
180        match self {
181            Body::Empty => Body::Empty,
182            Body::Bytes(b) => Body::Bytes(b.clone()),
183            Body::Text(s) => Body::Text(s.clone()),
184            Body::Json(v) => Body::Json(v.clone()),
185            Body::Xml(s) => Body::Xml(s.clone()),
186            Body::Stream(s) => Body::Stream(s.clone()),
187        }
188    }
189}
190
191impl PartialEq for Body {
192    fn eq(&self, other: &Self) -> bool {
193        match (self, other) {
194            (Body::Empty, Body::Empty) => true,
195            (Body::Text(a), Body::Text(b)) => a == b,
196            (Body::Json(a), Body::Json(b)) => a == b,
197            (Body::Bytes(a), Body::Bytes(b)) => a == b,
198            (Body::Xml(a), Body::Xml(b)) => a == b,
199            // Stream: two streams are never equal (single-consumption)
200            _ => false,
201        }
202    }
203}
204
205impl Body {
206    /// Returns `true` if the body is empty.
207    pub fn is_empty(&self) -> bool {
208        matches!(self, Body::Empty)
209    }
210
211    /// Convert the body into `Bytes`, consuming it if it is a stream.
212    /// This is an async operation because it may need to read from an underlying stream.
213    /// A `max_size` limit is enforced to prevent OOM errors.
214    pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
215        match self {
216            Body::Empty => Ok(Bytes::new()),
217            Body::Bytes(b) => {
218                if b.len() > max_size {
219                    return Err(CamelError::StreamLimitExceeded(max_size));
220                }
221                Ok(b)
222            }
223            Body::Text(s) => {
224                if s.len() > max_size {
225                    return Err(CamelError::StreamLimitExceeded(max_size));
226                }
227                Ok(Bytes::from(s))
228            }
229            Body::Json(v) => {
230                let b = serde_json::to_vec(&v)
231                    .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
232                if b.len() > max_size {
233                    return Err(CamelError::StreamLimitExceeded(max_size));
234                }
235                Ok(Bytes::from(b))
236            }
237            Body::Xml(s) => {
238                if s.len() > max_size {
239                    return Err(CamelError::StreamLimitExceeded(max_size));
240                }
241                Ok(Bytes::from(s))
242            }
243            Body::Stream(s) => {
244                let mut stream_lock = s.stream.lock().await;
245                let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
246
247                let mut buffer = BytesMut::new();
248                while let Some(chunk_res) = stream.next().await {
249                    let chunk = chunk_res?;
250                    if buffer.len() + chunk.len() > max_size {
251                        return Err(CamelError::StreamLimitExceeded(max_size));
252                    }
253                    buffer.extend_from_slice(&chunk);
254                }
255                Ok(buffer.freeze())
256            }
257        }
258    }
259
260    /// Materialize stream with sensible default limit (10 MB).
261    ///
262    /// Convenience method for common cases where you need the stream content
263    /// but don't want to specify a custom limit. For the tighter stream-cache
264    /// threshold (128 KB), use [`stream_cache::DEFAULT_STREAM_CACHE_THRESHOLD`]
265    /// with [`Body::into_bytes()`] instead.
266    ///
267    /// # Example
268    /// ```ignore
269    /// let body = Body::Stream(stream);
270    /// let bytes = body.materialize().await?;
271    /// ```
272    pub async fn materialize(self) -> Result<Bytes, CamelError> {
273        self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
274    }
275
276    /// Convert the body into an [`AsyncRead`] without materializing it into memory.
277    ///
278    /// - [`Body::Empty`] → empty reader (0 bytes)
279    /// - [`Body::Bytes`] → in-memory cursor
280    /// - [`Body::Text`] → UTF-8 bytes cursor
281    /// - [`Body::Json`] → serialized JSON bytes cursor
282    /// - [`Body::Xml`] → UTF-8 bytes cursor
283    /// - [`Body::Stream`] → streams chunk-by-chunk via [`StreamReader`];
284    ///   if the stream was already consumed, the reader returns an [`io::Error`]
285    ///   on the first read
286    pub fn into_async_read(self) -> Result<BoxAsyncRead, CamelError> {
287        match self {
288            Body::Empty => Ok(Box::pin(tokio::io::empty())),
289            Body::Bytes(b) => Ok(Box::pin(std::io::Cursor::new(b))),
290            Body::Text(s) => Ok(Box::pin(std::io::Cursor::new(s.into_bytes()))),
291            Body::Json(v) => {
292                let bytes = serde_json::to_vec(&v)
293                    .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
294                Ok(Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead)
295            }
296            Body::Xml(s) => Ok(Box::pin(std::io::Cursor::new(s.into_bytes()))),
297            Body::Stream(s) => Ok(Box::pin(StreamAsyncRead {
298                arc: s.stream,
299                reader: None,
300                consumed: false,
301            })),
302        }
303    }
304
305    /// Try to get the body as a string, converting from bytes if needed.
306    pub fn as_text(&self) -> Option<&str> {
307        match self {
308            Body::Text(s) => Some(s.as_str()),
309            _ => None,
310        }
311    }
312
313    /// Try to get the body as an XML string.
314    pub fn as_xml(&self) -> Option<&str> {
315        match self {
316            Body::Xml(s) => Some(s.as_str()),
317            _ => None,
318        }
319    }
320
321    /// Convert this body to `Body::Text`, consuming it.
322    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
323    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
324    pub fn try_into_text(self) -> Result<Body, CamelError> {
325        crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
326    }
327
328    /// Convert this body to `Body::Json`, consuming it.
329    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
330    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
331    pub fn try_into_json(self) -> Result<Body, CamelError> {
332        crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
333    }
334
335    /// Convert this body to `Body::Bytes`, consuming it.
336    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
337    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
338    pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
339        crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
340    }
341
342    /// Convert this body to `Body::Xml`, consuming it.
343    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
344    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
345    pub fn try_into_xml(self) -> Result<Body, CamelError> {
346        crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
347    }
348}
349
350// Conversion impls
351impl From<String> for Body {
352    fn from(s: String) -> Self {
353        Body::Text(s)
354    }
355}
356
357impl From<&str> for Body {
358    fn from(s: &str) -> Self {
359        Body::Text(s.to_string())
360    }
361}
362
363impl From<Bytes> for Body {
364    fn from(b: Bytes) -> Self {
365        Body::Bytes(b)
366    }
367}
368
369impl From<Vec<u8>> for Body {
370    fn from(v: Vec<u8>) -> Self {
371        Body::Bytes(Bytes::from(v))
372    }
373}
374
375impl From<serde_json::Value> for Body {
376    fn from(v: serde_json::Value) -> Self {
377        Body::Json(v)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn test_body_default_is_empty() {
387        let body = Body::default();
388        assert!(body.is_empty());
389    }
390
391    #[test]
392    fn test_body_from_string() {
393        let body = Body::from("hello".to_string());
394        assert_eq!(body.as_text(), Some("hello"));
395    }
396
397    #[test]
398    fn test_body_from_str() {
399        let body = Body::from("world");
400        assert_eq!(body.as_text(), Some("world"));
401    }
402
403    #[test]
404    fn test_body_from_bytes() {
405        let body = Body::from(Bytes::from_static(b"data"));
406        assert!(!body.is_empty());
407        assert!(matches!(body, Body::Bytes(_)));
408    }
409
410    #[test]
411    fn test_body_from_json() {
412        let val = serde_json::json!({"key": "value"});
413        let body = Body::from(val.clone());
414        assert!(matches!(body, Body::Json(_)));
415    }
416
417    #[tokio::test]
418    async fn test_into_bytes_from_stream() {
419        use futures::stream;
420        let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
421        let stream = stream::iter(chunks);
422        let body = Body::Stream(StreamBody {
423            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
424            metadata: StreamMetadata::default(),
425        });
426
427        let result = body.into_bytes(100).await.unwrap();
428        assert_eq!(result, Bytes::from("hello world"));
429    }
430
431    #[tokio::test]
432    async fn test_into_bytes_limit_exceeded() {
433        use futures::stream;
434        let chunks = vec![Ok(Bytes::from("this is too long"))];
435        let stream = stream::iter(chunks);
436        let body = Body::Stream(StreamBody {
437            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
438            metadata: StreamMetadata::default(),
439        });
440
441        let result = body.into_bytes(5).await;
442        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
443    }
444
445    #[tokio::test]
446    async fn test_into_bytes_already_consumed() {
447        use futures::stream;
448        let chunks = vec![Ok(Bytes::from("data"))];
449        let stream = stream::iter(chunks);
450        let body = Body::Stream(StreamBody {
451            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
452            metadata: StreamMetadata::default(),
453        });
454
455        let cloned = body.clone();
456        let _ = body.into_bytes(100).await.unwrap();
457
458        let result = cloned.into_bytes(100).await;
459        assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
460    }
461
462    #[tokio::test]
463    async fn test_materialize_with_default_limit() {
464        use futures::stream;
465
466        // Small stream under limit - should succeed with default 10MB limit
467        let chunks = vec![Ok(Bytes::from("test data"))];
468        let stream = stream::iter(chunks);
469        let body = Body::Stream(StreamBody {
470            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
471            metadata: StreamMetadata::default(),
472        });
473
474        let result = body.materialize().await;
475        assert!(result.is_ok());
476        assert_eq!(result.unwrap(), Bytes::from("test data"));
477    }
478
479    #[tokio::test]
480    async fn test_materialize_non_stream_body_types() {
481        // Verify materialize() works with all body types, not just streams
482
483        // Body::Empty
484        let body = Body::Empty;
485        let result = body.materialize().await.unwrap();
486        assert!(result.is_empty());
487
488        // Body::Bytes
489        let body = Body::Bytes(Bytes::from("bytes data"));
490        let result = body.materialize().await.unwrap();
491        assert_eq!(result, Bytes::from("bytes data"));
492
493        // Body::Text
494        let body = Body::Text("text data".to_string());
495        let result = body.materialize().await.unwrap();
496        assert_eq!(result, Bytes::from("text data"));
497
498        // Body::Json
499        let body = Body::Json(serde_json::json!({"key": "value"}));
500        let result = body.materialize().await.unwrap();
501        assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
502
503        // Body::Xml
504        let xml = "<root><child>value</child></root>";
505        let body = Body::Xml(xml.to_string());
506        let result = body.materialize().await.unwrap();
507        assert_eq!(result, Bytes::from(xml));
508    }
509
510    #[tokio::test]
511    async fn test_materialize_exceeds_default_limit() {
512        use futures::stream;
513
514        // 11MB stream - should fail with default 10MB limit
515        let large_data = vec![0u8; 11 * 1024 * 1024];
516        let chunks = vec![Ok(Bytes::from(large_data))];
517        let stream = stream::iter(chunks);
518        let body = Body::Stream(StreamBody {
519            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
520            metadata: StreamMetadata::default(),
521        });
522
523        let result = body.materialize().await;
524        assert!(matches!(
525            result,
526            Err(CamelError::StreamLimitExceeded(10_485_760))
527        ));
528    }
529
530    #[test]
531    fn stream_variants_are_never_equal() {
532        use futures::stream;
533
534        let make_stream = || {
535            let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
536            Body::Stream(StreamBody {
537                stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
538                metadata: StreamMetadata::default(),
539            })
540        };
541        assert_ne!(make_stream(), make_stream());
542    }
543
544    // XML body tests
545
546    #[test]
547    fn test_body_xml_as_xml() {
548        let xml = "<root><child>value</child></root>";
549        let body = Body::Xml(xml.to_string());
550        assert_eq!(body.as_xml(), Some(xml));
551    }
552
553    #[test]
554    fn test_body_non_xml_as_xml_returns_none() {
555        // Body::Text should return None for as_xml()
556        let body = Body::Text("<root/>".to_string());
557        assert_eq!(body.as_xml(), None);
558
559        // Body::Empty should return None
560        let body = Body::Empty;
561        assert_eq!(body.as_xml(), None);
562
563        // Body::Bytes should return None
564        let body = Body::Bytes(Bytes::from("<root/>"));
565        assert_eq!(body.as_xml(), None);
566
567        // Body::Json should return None
568        let body = Body::Json(serde_json::json!({"key": "value"}));
569        assert_eq!(body.as_xml(), None);
570    }
571
572    #[test]
573    fn test_body_xml_partial_eq() {
574        // Same XML content should be equal
575        let body1 = Body::Xml("a".to_string());
576        let body2 = Body::Xml("a".to_string());
577        assert_eq!(body1, body2);
578
579        // Different XML content should not be equal
580        let body1 = Body::Xml("a".to_string());
581        let body2 = Body::Xml("b".to_string());
582        assert_ne!(body1, body2);
583    }
584
585    #[test]
586    fn test_body_xml_not_equal_to_other_variants() {
587        // Body::Xml should not equal Body::Text even with same content
588        let xml_body = Body::Xml("x".to_string());
589        let text_body = Body::Text("x".to_string());
590        assert_ne!(xml_body, text_body);
591    }
592
593    #[test]
594    fn test_try_into_xml_from_text() {
595        let body = Body::Text("<root/>".to_string());
596        let result = body.try_into_xml();
597        assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
598    }
599
600    #[test]
601    fn test_try_into_xml_invalid_text() {
602        let body = Body::Text("not xml".to_string());
603        let result = body.try_into_xml();
604        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
605    }
606
607    #[test]
608    fn test_body_xml_clone() {
609        let original = Body::Xml("hello".to_string());
610        let cloned = original.clone();
611        assert_eq!(original, cloned);
612    }
613
614    // ---------- into_async_read tests ----------
615
616    #[tokio::test]
617    async fn test_into_async_read_empty() {
618        use tokio::io::AsyncReadExt;
619        let body = Body::Empty;
620        let mut reader = body.into_async_read().unwrap();
621        let mut buf = Vec::new();
622        reader.read_to_end(&mut buf).await.unwrap();
623        assert!(buf.is_empty());
624    }
625
626    #[tokio::test]
627    async fn test_into_async_read_bytes() {
628        use tokio::io::AsyncReadExt;
629        let body = Body::Bytes(Bytes::from("hello"));
630        let mut reader = body.into_async_read().unwrap();
631        let mut buf = Vec::new();
632        reader.read_to_end(&mut buf).await.unwrap();
633        assert_eq!(buf, b"hello");
634    }
635
636    #[tokio::test]
637    async fn test_into_async_read_text() {
638        use tokio::io::AsyncReadExt;
639        let body = Body::Text("world".to_string());
640        let mut reader = body.into_async_read().unwrap();
641        let mut buf = Vec::new();
642        reader.read_to_end(&mut buf).await.unwrap();
643        assert_eq!(buf, b"world");
644    }
645
646    #[tokio::test]
647    async fn test_into_async_read_json() {
648        use tokio::io::AsyncReadExt;
649        let body = Body::Json(serde_json::json!({"key": "val"}));
650        let mut reader = body.into_async_read().unwrap();
651        let mut buf = Vec::new();
652        reader.read_to_end(&mut buf).await.unwrap();
653        let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
654        assert_eq!(parsed["key"], "val");
655    }
656
657    #[tokio::test]
658    async fn test_into_async_read_xml() {
659        use tokio::io::AsyncReadExt;
660        let body = Body::Xml("<root/>".to_string());
661        let mut reader = body.into_async_read().unwrap();
662        let mut buf = Vec::new();
663        reader.read_to_end(&mut buf).await.unwrap();
664        assert_eq!(buf, b"<root/>");
665    }
666
667    #[tokio::test]
668    async fn test_into_async_read_stream_multichunk() {
669        use tokio::io::AsyncReadExt;
670        let chunks: Vec<Result<Bytes, CamelError>> = vec![
671            Ok(Bytes::from("foo")),
672            Ok(Bytes::from("bar")),
673            Ok(Bytes::from("baz")),
674        ];
675        let stream = futures::stream::iter(chunks);
676        let body = Body::Stream(StreamBody {
677            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
678            metadata: StreamMetadata {
679                size_hint: None,
680                content_type: None,
681                origin: None,
682            },
683        });
684        let mut reader = body.into_async_read().unwrap();
685        let mut buf = Vec::new();
686        reader.read_to_end(&mut buf).await.unwrap();
687        assert_eq!(buf, b"foobarbaz");
688    }
689
690    #[tokio::test]
691    async fn test_into_async_read_already_consumed() {
692        use tokio::io::AsyncReadExt;
693        // Mutex holds None → stream already consumed
694        type MaybeStream = Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>;
695        let arc: MaybeStream = Arc::new(Mutex::new(None));
696        let body = Body::Stream(StreamBody {
697            stream: arc,
698            metadata: StreamMetadata {
699                size_hint: None,
700                content_type: None,
701                origin: None,
702            },
703        });
704        let mut reader = body.into_async_read().unwrap();
705        let mut buf = Vec::new();
706        let result = reader.read_to_end(&mut buf).await;
707        assert!(result.is_err());
708    }
709}