Skip to main content

camel_api/
body.rs

1use crate::error::CamelError;
2use bytes::{Bytes, BytesMut};
3use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
9
10/// Metadata associated with a stream body.
11#[derive(Debug, Clone, Default)]
12pub struct StreamMetadata {
13    /// Expected size of the stream if known.
14    pub size_hint: Option<u64>,
15    /// Content type of the stream content.
16    pub content_type: Option<String>,
17    /// Origin of the stream (e.g. "file:///path/to/file").
18    pub origin: Option<String>,
19}
20
21/// A body that wraps a lazy-evaluated stream of bytes.
22///
23/// # Clone Semantics
24///
25/// The stream is **single-consumption**. When cloning a `Body::Stream`,
26/// all clones share the same underlying stream handle. Only the first
27/// clone to consume the stream will succeed; subsequent attempts will
28/// return `CamelError::AlreadyConsumed`.
29///
30/// # Example
31///
32/// ```rust
33/// use camel_api::{Body, StreamBody, error::CamelError};
34/// use futures::stream;
35/// use bytes::Bytes;
36/// use std::sync::Arc;
37/// use tokio::sync::Mutex;
38///
39/// # #[tokio::main]
40/// # async fn main() -> Result<(), CamelError> {
41/// let chunks = vec![Ok(Bytes::from("data"))];
42/// let stream = stream::iter(chunks);
43/// let body = Body::Stream(StreamBody {
44///     stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
45///     metadata: Default::default(),
46/// });
47///
48/// let clone = body.clone();
49///
50/// // First consumption succeeds
51/// let _ = body.into_bytes(1024).await?;
52///
53/// // Second consumption fails
54/// let result = clone.into_bytes(1024).await;
55/// assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
56/// # Ok(())
57/// # }
58/// ```
59pub struct StreamBody {
60    /// The actual byte stream, wrapped in an Arc and Mutex to allow Clone for Body.
61    #[allow(clippy::type_complexity)]
62    pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
63    /// Metadata associated with the stream.
64    pub metadata: StreamMetadata,
65}
66
67impl std::fmt::Debug for StreamBody {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("StreamBody")
70            .field("metadata", &self.metadata)
71            .field("stream", &"<BoxStream>")
72            .finish()
73    }
74}
75
76impl Clone for StreamBody {
77    fn clone(&self) -> Self {
78        Self {
79            stream: Arc::clone(&self.stream),
80            metadata: self.metadata.clone(),
81        }
82    }
83}
84
85/// The body of a message, supporting common payload types.
86#[derive(Debug, Default)]
87pub enum Body {
88    /// No body content.
89    #[default]
90    Empty,
91    /// Raw bytes payload.
92    Bytes(Bytes),
93    /// UTF-8 string payload.
94    Text(String),
95    /// JSON payload.
96    Json(serde_json::Value),
97    /// Streaming payload.
98    Stream(StreamBody),
99}
100
101impl Clone for Body {
102    fn clone(&self) -> Self {
103        match self {
104            Body::Empty => Body::Empty,
105            Body::Bytes(b) => Body::Bytes(b.clone()),
106            Body::Text(s) => Body::Text(s.clone()),
107            Body::Json(v) => Body::Json(v.clone()),
108            Body::Stream(s) => Body::Stream(s.clone()),
109        }
110    }
111}
112
113impl PartialEq for Body {
114    fn eq(&self, other: &Self) -> bool {
115        match (self, other) {
116            (Body::Empty, Body::Empty) => true,
117            (Body::Text(a), Body::Text(b)) => a == b,
118            (Body::Json(a), Body::Json(b)) => a == b,
119            (Body::Bytes(a), Body::Bytes(b)) => a == b,
120            // Stream: two streams are never equal (single-consumption)
121            _ => false,
122        }
123    }
124}
125
126impl Body {
127    /// Returns `true` if the body is empty.
128    pub fn is_empty(&self) -> bool {
129        matches!(self, Body::Empty)
130    }
131
132    /// Convert the body into `Bytes`, consuming it if it is a stream.
133    /// This is an async operation because it may need to read from an underlying stream.
134    /// A `max_size` limit is enforced to prevent OOM errors.
135    pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
136        match self {
137            Body::Empty => Ok(Bytes::new()),
138            Body::Bytes(b) => {
139                if b.len() > max_size {
140                    return Err(CamelError::StreamLimitExceeded(max_size));
141                }
142                Ok(b)
143            }
144            Body::Text(s) => {
145                if s.len() > max_size {
146                    return Err(CamelError::StreamLimitExceeded(max_size));
147                }
148                Ok(Bytes::from(s))
149            }
150            Body::Json(v) => {
151                let b = serde_json::to_vec(&v)
152                    .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
153                if b.len() > max_size {
154                    return Err(CamelError::StreamLimitExceeded(max_size));
155                }
156                Ok(Bytes::from(b))
157            }
158            Body::Stream(s) => {
159                let mut stream_lock = s.stream.lock().await;
160                let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
161
162                let mut buffer = BytesMut::new();
163                while let Some(chunk_res) = stream.next().await {
164                    let chunk = chunk_res?;
165                    if buffer.len() + chunk.len() > max_size {
166                        return Err(CamelError::StreamLimitExceeded(max_size));
167                    }
168                    buffer.extend_from_slice(&chunk);
169                }
170                Ok(buffer.freeze())
171            }
172        }
173    }
174
175    /// Materialize stream with sensible default limit (10MB).
176    ///
177    /// Convenience method for common cases where you need the stream content
178    /// but don't want to specify a custom limit.
179    ///
180    /// # Example
181    /// ```ignore
182    /// let body = Body::Stream(stream);
183    /// let bytes = body.materialize().await?;
184    /// ```
185    pub async fn materialize(self) -> Result<Bytes, CamelError> {
186        self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
187    }
188
189    /// Try to get the body as a string, converting from bytes if needed.
190    pub fn as_text(&self) -> Option<&str> {
191        match self {
192            Body::Text(s) => Some(s.as_str()),
193            _ => None,
194        }
195    }
196
197    /// Convert this body to `Body::Text`, consuming it.
198    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
199    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
200    pub fn try_into_text(self) -> Result<Body, CamelError> {
201        crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
202    }
203
204    /// Convert this body to `Body::Json`, consuming it.
205    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
206    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
207    pub fn try_into_json(self) -> Result<Body, CamelError> {
208        crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
209    }
210
211    /// Convert this body to `Body::Bytes`, consuming it.
212    /// Returns `Err(TypeConversionFailed)` if the conversion is not possible.
213    /// `Body::Stream` always fails — materialize with `into_bytes()` first.
214    pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
215        crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
216    }
217}
218
219// Conversion impls
220impl From<String> for Body {
221    fn from(s: String) -> Self {
222        Body::Text(s)
223    }
224}
225
226impl From<&str> for Body {
227    fn from(s: &str) -> Self {
228        Body::Text(s.to_string())
229    }
230}
231
232impl From<Bytes> for Body {
233    fn from(b: Bytes) -> Self {
234        Body::Bytes(b)
235    }
236}
237
238impl From<Vec<u8>> for Body {
239    fn from(v: Vec<u8>) -> Self {
240        Body::Bytes(Bytes::from(v))
241    }
242}
243
244impl From<serde_json::Value> for Body {
245    fn from(v: serde_json::Value) -> Self {
246        Body::Json(v)
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[test]
255    fn test_body_default_is_empty() {
256        let body = Body::default();
257        assert!(body.is_empty());
258    }
259
260    #[test]
261    fn test_body_from_string() {
262        let body = Body::from("hello".to_string());
263        assert_eq!(body.as_text(), Some("hello"));
264    }
265
266    #[test]
267    fn test_body_from_str() {
268        let body = Body::from("world");
269        assert_eq!(body.as_text(), Some("world"));
270    }
271
272    #[test]
273    fn test_body_from_bytes() {
274        let body = Body::from(Bytes::from_static(b"data"));
275        assert!(!body.is_empty());
276        assert!(matches!(body, Body::Bytes(_)));
277    }
278
279    #[test]
280    fn test_body_from_json() {
281        let val = serde_json::json!({"key": "value"});
282        let body = Body::from(val.clone());
283        assert!(matches!(body, Body::Json(_)));
284    }
285
286    #[tokio::test]
287    async fn test_into_bytes_from_stream() {
288        use futures::stream;
289        let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
290        let stream = stream::iter(chunks);
291        let body = Body::Stream(StreamBody {
292            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
293            metadata: StreamMetadata::default(),
294        });
295
296        let result = body.into_bytes(100).await.unwrap();
297        assert_eq!(result, Bytes::from("hello world"));
298    }
299
300    #[tokio::test]
301    async fn test_into_bytes_limit_exceeded() {
302        use futures::stream;
303        let chunks = vec![Ok(Bytes::from("this is too long"))];
304        let stream = stream::iter(chunks);
305        let body = Body::Stream(StreamBody {
306            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
307            metadata: StreamMetadata::default(),
308        });
309
310        let result = body.into_bytes(5).await;
311        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
312    }
313
314    #[tokio::test]
315    async fn test_into_bytes_already_consumed() {
316        use futures::stream;
317        let chunks = vec![Ok(Bytes::from("data"))];
318        let stream = stream::iter(chunks);
319        let body = Body::Stream(StreamBody {
320            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
321            metadata: StreamMetadata::default(),
322        });
323
324        let cloned = body.clone();
325        let _ = body.into_bytes(100).await.unwrap();
326
327        let result = cloned.into_bytes(100).await;
328        assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
329    }
330
331    #[tokio::test]
332    async fn test_materialize_with_default_limit() {
333        use futures::stream;
334
335        // Small stream under limit - should succeed with default 10MB limit
336        let chunks = vec![Ok(Bytes::from("test data"))];
337        let stream = stream::iter(chunks);
338        let body = Body::Stream(StreamBody {
339            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
340            metadata: StreamMetadata::default(),
341        });
342
343        let result = body.materialize().await;
344        assert!(result.is_ok());
345        assert_eq!(result.unwrap(), Bytes::from("test data"));
346    }
347
348    #[tokio::test]
349    async fn test_materialize_non_stream_body_types() {
350        // Verify materialize() works with all body types, not just streams
351
352        // Body::Empty
353        let body = Body::Empty;
354        let result = body.materialize().await.unwrap();
355        assert!(result.is_empty());
356
357        // Body::Bytes
358        let body = Body::Bytes(Bytes::from("bytes data"));
359        let result = body.materialize().await.unwrap();
360        assert_eq!(result, Bytes::from("bytes data"));
361
362        // Body::Text
363        let body = Body::Text("text data".to_string());
364        let result = body.materialize().await.unwrap();
365        assert_eq!(result, Bytes::from("text data"));
366
367        // Body::Json
368        let body = Body::Json(serde_json::json!({"key": "value"}));
369        let result = body.materialize().await.unwrap();
370        assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
371    }
372
373    #[tokio::test]
374    async fn test_materialize_exceeds_default_limit() {
375        use futures::stream;
376
377        // 15MB stream - should fail with default 10MB limit
378        let large_data = vec![0u8; 15 * 1024 * 1024];
379        let chunks = vec![Ok(Bytes::from(large_data))];
380        let stream = stream::iter(chunks);
381        let body = Body::Stream(StreamBody {
382            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
383            metadata: StreamMetadata::default(),
384        });
385
386        let result = body.materialize().await;
387        assert!(matches!(
388            result,
389            Err(CamelError::StreamLimitExceeded(10_485_760))
390        ));
391    }
392
393    #[test]
394    fn stream_variants_are_never_equal() {
395        use futures::stream;
396
397        let make_stream = || {
398            let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
399            Body::Stream(StreamBody {
400                stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
401                metadata: StreamMetadata::default(),
402            })
403        };
404        assert_ne!(make_stream(), make_stream());
405    }
406}