Skip to main content

camel_api/
body.rs

1use crate::error::CamelError;
2use bytes::{Bytes, BytesMut};
3use futures::stream::BoxStream;
4use futures::{StreamExt, TryStreamExt};
5use std::io;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::task::{Context as TaskContext, Poll};
9use tokio::io::{AsyncRead, ReadBuf};
10use tokio::sync::Mutex;
11use tokio_util::io::StreamReader;
12
13const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
14
15/// A boxed [`AsyncRead`] for reading body content without materializing it into memory.
16///
17/// Returned by [`Body::into_async_read()`].
18pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
19
20/// Metadata associated with a stream body.
21#[derive(Debug, Clone, Default)]
22pub struct StreamMetadata {
23    /// Expected size of the stream if known.
24    pub size_hint: Option<u64>,
25    /// Content type of the stream content.
26    pub content_type: Option<String>,
27    /// Origin of the stream (e.g. "file:///path/to/file").
28    pub origin: Option<String>,
29}
30
31/// A body that wraps a lazy-evaluated stream of bytes.
32///
33/// # Clone Semantics
34///
35/// The stream is **single-consumption**. When cloning a `Body::Stream`,
36/// all clones share the same underlying stream handle. Only the first
37/// clone to consume the stream will succeed; subsequent attempts will
38/// return `CamelError::AlreadyConsumed`.
39///
40/// # Example
41///
42/// ```rust
43/// use camel_api::{Body, StreamBody, error::CamelError};
44/// use futures::stream;
45/// use bytes::Bytes;
46/// use std::sync::Arc;
47/// use tokio::sync::Mutex;
48///
49/// # #[tokio::main]
50/// # async fn main() -> Result<(), CamelError> {
51/// let chunks = vec![Ok(Bytes::from("data"))];
52/// let stream = stream::iter(chunks);
53/// let body = Body::Stream(StreamBody {
54///     stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
55///     metadata: Default::default(),
56/// });
57///
58/// let clone = body.clone();
59///
60/// // First consumption succeeds
61/// let _ = body.into_bytes(1024).await?;
62///
63/// // Second consumption fails
64/// let result = clone.into_bytes(1024).await;
65/// assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
66/// # Ok(())
67/// # }
68/// ```
69pub struct StreamBody {
70    /// The actual byte stream, wrapped in an Arc and Mutex to allow Clone for Body.
71    #[allow(clippy::type_complexity)]
72    pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
73    /// Metadata associated with the stream.
74    pub metadata: StreamMetadata,
75}
76
77impl std::fmt::Debug for StreamBody {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("StreamBody")
80            .field("metadata", &self.metadata)
81            .field("stream", &"<BoxStream>")
82            .finish()
83    }
84}
85
86impl Clone for StreamBody {
87    fn clone(&self) -> Self {
88        Self {
89            stream: Arc::clone(&self.stream),
90            metadata: self.metadata.clone(),
91        }
92    }
93}
94
95// ---------------------------------------------------------------------------
96// StreamAsyncRead — adapts Body::Stream into AsyncRead
97// ---------------------------------------------------------------------------
98
99/// Private adapter that implements [`AsyncRead`] for [`Body::Stream`].
100///
101/// On the first `poll_read`, attempts a non-blocking `try_lock()` on the inner
102/// `Arc<Mutex<Option<BoxStream>>>`:
103/// - If the lock succeeds and the stream is `Some`, extracts it and creates
104///   an active [`StreamReader`].
105/// - If the stream is `None` (already consumed), returns an [`io::Error`].
106/// - If the lock is contended (extremely rare), wakes the task and returns
107///   `Poll::Pending` to retry.
108#[allow(clippy::type_complexity)]
109struct StreamAsyncRead {
110    arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
111    /// Holds the active reader after the stream is extracted on first poll.
112    reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
113    /// Set to true when the stream was already consumed (prevents further reads).
114    consumed: bool,
115}
116
117impl AsyncRead for StreamAsyncRead {
118    fn poll_read(
119        mut self: Pin<&mut Self>,
120        cx: &mut TaskContext<'_>,
121        buf: &mut ReadBuf<'_>,
122    ) -> Poll<io::Result<()>> {
123        if self.consumed {
124            return Poll::Ready(Err(io::Error::other("stream already consumed")));
125        }
126        // Lazy init: extract the stream on first poll
127        if self.reader.is_none() {
128            // Extract stream in a separate scope to avoid holding the lock while modifying self
129            let extracted = {
130                match self.arc.try_lock() {
131                    Ok(mut guard) => guard.take(),
132                    Err(_) => {
133                        // Lock contended — schedule a retry
134                        cx.waker().wake_by_ref();
135                        return Poll::Pending;
136                    }
137                }
138            };
139            // Now safe to modify self since lock is dropped
140            match extracted {
141                Some(stream) => {
142                    let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
143                    self.reader = Some(Box::new(StreamReader::new(mapped)));
144                }
145                None => {
146                    self.consumed = true;
147                    return Poll::Ready(Err(io::Error::other("stream already consumed")));
148                }
149            }
150        }
151        // Delegate to the active reader
152        Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf)
153    }
154}
155
156/// The body of a message, supporting common payload types.
157#[derive(Debug, Default)]
158pub enum Body {
159    /// No body content.
160    #[default]
161    Empty,
162    /// Raw bytes payload.
163    Bytes(Bytes),
164    /// UTF-8 string payload.
165    Text(String),
166    /// JSON payload.
167    Json(serde_json::Value),
168    /// XML payload (well-formed XML string; use `try_into_xml()` for validation).
169    Xml(String),
170    /// Streaming payload.
171    Stream(StreamBody),
172}
173
174impl Clone for Body {
175    fn clone(&self) -> Self {
176        match self {
177            Body::Empty => Body::Empty,
178            Body::Bytes(b) => Body::Bytes(b.clone()),
179            Body::Text(s) => Body::Text(s.clone()),
180            Body::Json(v) => Body::Json(v.clone()),
181            Body::Xml(s) => Body::Xml(s.clone()),
182            Body::Stream(s) => Body::Stream(s.clone()),
183        }
184    }
185}
186
187impl PartialEq for Body {
188    fn eq(&self, other: &Self) -> bool {
189        match (self, other) {
190            (Body::Empty, Body::Empty) => true,
191            (Body::Text(a), Body::Text(b)) => a == b,
192            (Body::Json(a), Body::Json(b)) => a == b,
193            (Body::Bytes(a), Body::Bytes(b)) => a == b,
194            (Body::Xml(a), Body::Xml(b)) => a == b,
195            // Stream: two streams are never equal (single-consumption)
196            _ => false,
197        }
198    }
199}
200
201impl Body {
202    /// Returns `true` if the body is empty.
203    pub fn is_empty(&self) -> bool {
204        matches!(self, Body::Empty)
205    }
206
207    /// Convert the body into `Bytes`, consuming it if it is a stream.
208    /// This is an async operation because it may need to read from an underlying stream.
209    /// A `max_size` limit is enforced to prevent OOM errors.
210    pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
211        match self {
212            Body::Empty => Ok(Bytes::new()),
213            Body::Bytes(b) => {
214                if b.len() > max_size {
215                    return Err(CamelError::StreamLimitExceeded(max_size));
216                }
217                Ok(b)
218            }
219            Body::Text(s) => {
220                if s.len() > max_size {
221                    return Err(CamelError::StreamLimitExceeded(max_size));
222                }
223                Ok(Bytes::from(s))
224            }
225            Body::Json(v) => {
226                let b = serde_json::to_vec(&v)
227                    .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
228                if b.len() > max_size {
229                    return Err(CamelError::StreamLimitExceeded(max_size));
230                }
231                Ok(Bytes::from(b))
232            }
233            Body::Xml(s) => {
234                if s.len() > max_size {
235                    return Err(CamelError::StreamLimitExceeded(max_size));
236                }
237                Ok(Bytes::from(s))
238            }
239            Body::Stream(s) => {
240                let mut stream_lock = s.stream.lock().await;
241                let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
242
243                let mut buffer = BytesMut::new();
244                while let Some(chunk_res) = stream.next().await {
245                    let chunk = chunk_res?;
246                    if buffer.len() + chunk.len() > max_size {
247                        return Err(CamelError::StreamLimitExceeded(max_size));
248                    }
249                    buffer.extend_from_slice(&chunk);
250                }
251                Ok(buffer.freeze())
252            }
253        }
254    }
255
256    /// Materialize stream with sensible default limit (10MB).
257    ///
258    /// Convenience method for common cases where you need the stream content
259    /// but don't want to specify a custom limit.
260    ///
261    /// # Example
262    /// ```ignore
263    /// let body = Body::Stream(stream);
264    /// let bytes = body.materialize().await?;
265    /// ```
266    pub async fn materialize(self) -> Result<Bytes, CamelError> {
267        self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
268    }
269
270    /// Convert the body into an [`AsyncRead`] without materializing it into memory.
271    ///
272    /// - [`Body::Empty`] → empty reader (0 bytes)
273    /// - [`Body::Bytes`] → in-memory cursor
274    /// - [`Body::Text`] → UTF-8 bytes cursor
275    /// - [`Body::Json`] → serialized JSON bytes cursor
276    /// - [`Body::Xml`] → UTF-8 bytes cursor
277    /// - [`Body::Stream`] → streams chunk-by-chunk via [`StreamReader`];
278    ///   if the stream was already consumed, the reader returns an [`io::Error`]
279    ///   on the first read
280    pub fn into_async_read(self) -> BoxAsyncRead {
281        match self {
282            Body::Empty => Box::pin(tokio::io::empty()),
283            Body::Bytes(b) => Box::pin(std::io::Cursor::new(b)),
284            Body::Text(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
285            Body::Json(v) => match serde_json::to_vec(&v) {
286                Ok(bytes) => Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead,
287                Err(e) => {
288                    let err = io::Error::new(io::ErrorKind::InvalidData, e.to_string());
289                    let stream = futures::stream::iter(vec![Err::<Bytes, io::Error>(err)]);
290                    Box::pin(StreamReader::new(stream)) as BoxAsyncRead
291                }
292            },
293            Body::Xml(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
294            Body::Stream(s) => Box::pin(StreamAsyncRead {
295                arc: s.stream,
296                reader: None,
297                consumed: false,
298            }),
299        }
300    }
301
302    /// Try to get the body as a string, converting from bytes if needed.
303    pub fn as_text(&self) -> Option<&str> {
304        match self {
305            Body::Text(s) => Some(s.as_str()),
306            _ => None,
307        }
308    }
309
310    /// Try to get the body as an XML string.
311    pub fn as_xml(&self) -> Option<&str> {
312        match self {
313            Body::Xml(s) => Some(s.as_str()),
314            _ => None,
315        }
316    }
317
318    /// Convert this body to `Body::Text`, consuming it.
319    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
320    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
321    pub fn try_into_text(self) -> Result<Body, CamelError> {
322        crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
323    }
324
325    /// Convert this body to `Body::Json`, consuming it.
326    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
327    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
328    pub fn try_into_json(self) -> Result<Body, CamelError> {
329        crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
330    }
331
332    /// Convert this body to `Body::Bytes`, consuming it.
333    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
334    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
335    pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
336        crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
337    }
338
339    /// Convert this body to `Body::Xml`, consuming it.
340    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
341    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
342    pub fn try_into_xml(self) -> Result<Body, CamelError> {
343        crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
344    }
345}
346
347// Conversion impls
348impl From<String> for Body {
349    fn from(s: String) -> Self {
350        Body::Text(s)
351    }
352}
353
354impl From<&str> for Body {
355    fn from(s: &str) -> Self {
356        Body::Text(s.to_string())
357    }
358}
359
360impl From<Bytes> for Body {
361    fn from(b: Bytes) -> Self {
362        Body::Bytes(b)
363    }
364}
365
366impl From<Vec<u8>> for Body {
367    fn from(v: Vec<u8>) -> Self {
368        Body::Bytes(Bytes::from(v))
369    }
370}
371
372impl From<serde_json::Value> for Body {
373    fn from(v: serde_json::Value) -> Self {
374        Body::Json(v)
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_body_default_is_empty() {
384        let body = Body::default();
385        assert!(body.is_empty());
386    }
387
388    #[test]
389    fn test_body_from_string() {
390        let body = Body::from("hello".to_string());
391        assert_eq!(body.as_text(), Some("hello"));
392    }
393
394    #[test]
395    fn test_body_from_str() {
396        let body = Body::from("world");
397        assert_eq!(body.as_text(), Some("world"));
398    }
399
400    #[test]
401    fn test_body_from_bytes() {
402        let body = Body::from(Bytes::from_static(b"data"));
403        assert!(!body.is_empty());
404        assert!(matches!(body, Body::Bytes(_)));
405    }
406
407    #[test]
408    fn test_body_from_json() {
409        let val = serde_json::json!({"key": "value"});
410        let body = Body::from(val.clone());
411        assert!(matches!(body, Body::Json(_)));
412    }
413
414    #[tokio::test]
415    async fn test_into_bytes_from_stream() {
416        use futures::stream;
417        let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
418        let stream = stream::iter(chunks);
419        let body = Body::Stream(StreamBody {
420            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
421            metadata: StreamMetadata::default(),
422        });
423
424        let result = body.into_bytes(100).await.unwrap();
425        assert_eq!(result, Bytes::from("hello world"));
426    }
427
428    #[tokio::test]
429    async fn test_into_bytes_limit_exceeded() {
430        use futures::stream;
431        let chunks = vec![Ok(Bytes::from("this is too long"))];
432        let stream = stream::iter(chunks);
433        let body = Body::Stream(StreamBody {
434            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
435            metadata: StreamMetadata::default(),
436        });
437
438        let result = body.into_bytes(5).await;
439        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
440    }
441
442    #[tokio::test]
443    async fn test_into_bytes_already_consumed() {
444        use futures::stream;
445        let chunks = vec![Ok(Bytes::from("data"))];
446        let stream = stream::iter(chunks);
447        let body = Body::Stream(StreamBody {
448            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
449            metadata: StreamMetadata::default(),
450        });
451
452        let cloned = body.clone();
453        let _ = body.into_bytes(100).await.unwrap();
454
455        let result = cloned.into_bytes(100).await;
456        assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
457    }
458
459    #[tokio::test]
460    async fn test_materialize_with_default_limit() {
461        use futures::stream;
462
463        // Small stream under limit - should succeed with default 10MB limit
464        let chunks = vec![Ok(Bytes::from("test data"))];
465        let stream = stream::iter(chunks);
466        let body = Body::Stream(StreamBody {
467            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
468            metadata: StreamMetadata::default(),
469        });
470
471        let result = body.materialize().await;
472        assert!(result.is_ok());
473        assert_eq!(result.unwrap(), Bytes::from("test data"));
474    }
475
476    #[tokio::test]
477    async fn test_materialize_non_stream_body_types() {
478        // Verify materialize() works with all body types, not just streams
479
480        // Body::Empty
481        let body = Body::Empty;
482        let result = body.materialize().await.unwrap();
483        assert!(result.is_empty());
484
485        // Body::Bytes
486        let body = Body::Bytes(Bytes::from("bytes data"));
487        let result = body.materialize().await.unwrap();
488        assert_eq!(result, Bytes::from("bytes data"));
489
490        // Body::Text
491        let body = Body::Text("text data".to_string());
492        let result = body.materialize().await.unwrap();
493        assert_eq!(result, Bytes::from("text data"));
494
495        // Body::Json
496        let body = Body::Json(serde_json::json!({"key": "value"}));
497        let result = body.materialize().await.unwrap();
498        assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
499
500        // Body::Xml
501        let xml = "<root><child>value</child></root>";
502        let body = Body::Xml(xml.to_string());
503        let result = body.materialize().await.unwrap();
504        assert_eq!(result, Bytes::from(xml));
505    }
506
507    #[tokio::test]
508    async fn test_materialize_exceeds_default_limit() {
509        use futures::stream;
510
511        // 15MB stream - should fail with default 10MB limit
512        let large_data = vec![0u8; 15 * 1024 * 1024];
513        let chunks = vec![Ok(Bytes::from(large_data))];
514        let stream = stream::iter(chunks);
515        let body = Body::Stream(StreamBody {
516            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
517            metadata: StreamMetadata::default(),
518        });
519
520        let result = body.materialize().await;
521        assert!(matches!(
522            result,
523            Err(CamelError::StreamLimitExceeded(10_485_760))
524        ));
525    }
526
527    #[test]
528    fn stream_variants_are_never_equal() {
529        use futures::stream;
530
531        let make_stream = || {
532            let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
533            Body::Stream(StreamBody {
534                stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
535                metadata: StreamMetadata::default(),
536            })
537        };
538        assert_ne!(make_stream(), make_stream());
539    }
540
541    // XML body tests
542
543    #[test]
544    fn test_body_xml_as_xml() {
545        let xml = "<root><child>value</child></root>";
546        let body = Body::Xml(xml.to_string());
547        assert_eq!(body.as_xml(), Some(xml));
548    }
549
550    #[test]
551    fn test_body_non_xml_as_xml_returns_none() {
552        // Body::Text should return None for as_xml()
553        let body = Body::Text("<root/>".to_string());
554        assert_eq!(body.as_xml(), None);
555
556        // Body::Empty should return None
557        let body = Body::Empty;
558        assert_eq!(body.as_xml(), None);
559
560        // Body::Bytes should return None
561        let body = Body::Bytes(Bytes::from("<root/>"));
562        assert_eq!(body.as_xml(), None);
563
564        // Body::Json should return None
565        let body = Body::Json(serde_json::json!({"key": "value"}));
566        assert_eq!(body.as_xml(), None);
567    }
568
569    #[test]
570    fn test_body_xml_partial_eq() {
571        // Same XML content should be equal
572        let body1 = Body::Xml("a".to_string());
573        let body2 = Body::Xml("a".to_string());
574        assert_eq!(body1, body2);
575
576        // Different XML content should not be equal
577        let body1 = Body::Xml("a".to_string());
578        let body2 = Body::Xml("b".to_string());
579        assert_ne!(body1, body2);
580    }
581
582    #[test]
583    fn test_body_xml_not_equal_to_other_variants() {
584        // Body::Xml should not equal Body::Text even with same content
585        let xml_body = Body::Xml("x".to_string());
586        let text_body = Body::Text("x".to_string());
587        assert_ne!(xml_body, text_body);
588    }
589
590    #[test]
591    fn test_try_into_xml_from_text() {
592        let body = Body::Text("<root/>".to_string());
593        let result = body.try_into_xml();
594        assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
595    }
596
597    #[test]
598    fn test_try_into_xml_invalid_text() {
599        let body = Body::Text("not xml".to_string());
600        let result = body.try_into_xml();
601        assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
602    }
603
604    #[test]
605    fn test_body_xml_clone() {
606        let original = Body::Xml("hello".to_string());
607        let cloned = original.clone();
608        assert_eq!(original, cloned);
609    }
610
611    // ---------- into_async_read tests ----------
612
613    #[tokio::test]
614    async fn test_into_async_read_empty() {
615        use tokio::io::AsyncReadExt;
616        let body = Body::Empty;
617        let mut reader = body.into_async_read();
618        let mut buf = Vec::new();
619        reader.read_to_end(&mut buf).await.unwrap();
620        assert!(buf.is_empty());
621    }
622
623    #[tokio::test]
624    async fn test_into_async_read_bytes() {
625        use tokio::io::AsyncReadExt;
626        let body = Body::Bytes(Bytes::from("hello"));
627        let mut reader = body.into_async_read();
628        let mut buf = Vec::new();
629        reader.read_to_end(&mut buf).await.unwrap();
630        assert_eq!(buf, b"hello");
631    }
632
633    #[tokio::test]
634    async fn test_into_async_read_text() {
635        use tokio::io::AsyncReadExt;
636        let body = Body::Text("world".to_string());
637        let mut reader = body.into_async_read();
638        let mut buf = Vec::new();
639        reader.read_to_end(&mut buf).await.unwrap();
640        assert_eq!(buf, b"world");
641    }
642
643    #[tokio::test]
644    async fn test_into_async_read_json() {
645        use tokio::io::AsyncReadExt;
646        let body = Body::Json(serde_json::json!({"key": "val"}));
647        let mut reader = body.into_async_read();
648        let mut buf = Vec::new();
649        reader.read_to_end(&mut buf).await.unwrap();
650        let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
651        assert_eq!(parsed["key"], "val");
652    }
653
654    #[tokio::test]
655    async fn test_into_async_read_xml() {
656        use tokio::io::AsyncReadExt;
657        let body = Body::Xml("<root/>".to_string());
658        let mut reader = body.into_async_read();
659        let mut buf = Vec::new();
660        reader.read_to_end(&mut buf).await.unwrap();
661        assert_eq!(buf, b"<root/>");
662    }
663
664    #[tokio::test]
665    async fn test_into_async_read_stream_multichunk() {
666        use tokio::io::AsyncReadExt;
667        let chunks: Vec<Result<Bytes, CamelError>> = vec![
668            Ok(Bytes::from("foo")),
669            Ok(Bytes::from("bar")),
670            Ok(Bytes::from("baz")),
671        ];
672        let stream = futures::stream::iter(chunks);
673        let body = Body::Stream(StreamBody {
674            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
675            metadata: StreamMetadata {
676                size_hint: None,
677                content_type: None,
678                origin: None,
679            },
680        });
681        let mut reader = body.into_async_read();
682        let mut buf = Vec::new();
683        reader.read_to_end(&mut buf).await.unwrap();
684        assert_eq!(buf, b"foobarbaz");
685    }
686
687    #[tokio::test]
688    async fn test_into_async_read_already_consumed() {
689        use tokio::io::AsyncReadExt;
690        // Mutex holds None → stream already consumed
691        let arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>> =
692            Arc::new(Mutex::new(None));
693        let body = Body::Stream(StreamBody {
694            stream: arc,
695            metadata: StreamMetadata {
696                size_hint: None,
697                content_type: None,
698                origin: None,
699            },
700        });
701        let mut reader = body.into_async_read();
702        let mut buf = Vec::new();
703        let result = reader.read_to_end(&mut buf).await;
704        assert!(result.is_err());
705    }
706}