1use crate::error::CamelError;
2use bytes::{Bytes, BytesMut};
3use futures::StreamExt;
4use futures::stream::BoxStream;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
9
10#[derive(Debug, Clone, Default)]
12pub struct StreamMetadata {
13 pub size_hint: Option<u64>,
15 pub content_type: Option<String>,
17 pub origin: Option<String>,
19}
20
21pub struct StreamBody {
60 #[allow(clippy::type_complexity)]
62 pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
63 pub metadata: StreamMetadata,
65}
66
67impl std::fmt::Debug for StreamBody {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("StreamBody")
70 .field("metadata", &self.metadata)
71 .field("stream", &"<BoxStream>")
72 .finish()
73 }
74}
75
76impl Clone for StreamBody {
77 fn clone(&self) -> Self {
78 Self {
79 stream: Arc::clone(&self.stream),
80 metadata: self.metadata.clone(),
81 }
82 }
83}
84
85#[derive(Debug, Default)]
87pub enum Body {
88 #[default]
90 Empty,
91 Bytes(Bytes),
93 Text(String),
95 Json(serde_json::Value),
97 Stream(StreamBody),
99}
100
101impl Clone for Body {
102 fn clone(&self) -> Self {
103 match self {
104 Body::Empty => Body::Empty,
105 Body::Bytes(b) => Body::Bytes(b.clone()),
106 Body::Text(s) => Body::Text(s.clone()),
107 Body::Json(v) => Body::Json(v.clone()),
108 Body::Stream(s) => Body::Stream(s.clone()),
109 }
110 }
111}
112
113impl PartialEq for Body {
114 fn eq(&self, other: &Self) -> bool {
115 match (self, other) {
116 (Body::Empty, Body::Empty) => true,
117 (Body::Text(a), Body::Text(b)) => a == b,
118 (Body::Json(a), Body::Json(b)) => a == b,
119 (Body::Bytes(a), Body::Bytes(b)) => a == b,
120 _ => false,
122 }
123 }
124}
125
126impl Body {
127 pub fn is_empty(&self) -> bool {
129 matches!(self, Body::Empty)
130 }
131
132 pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
136 match self {
137 Body::Empty => Ok(Bytes::new()),
138 Body::Bytes(b) => {
139 if b.len() > max_size {
140 return Err(CamelError::StreamLimitExceeded(max_size));
141 }
142 Ok(b)
143 }
144 Body::Text(s) => {
145 if s.len() > max_size {
146 return Err(CamelError::StreamLimitExceeded(max_size));
147 }
148 Ok(Bytes::from(s))
149 }
150 Body::Json(v) => {
151 let b = serde_json::to_vec(&v)
152 .map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
153 if b.len() > max_size {
154 return Err(CamelError::StreamLimitExceeded(max_size));
155 }
156 Ok(Bytes::from(b))
157 }
158 Body::Stream(s) => {
159 let mut stream_lock = s.stream.lock().await;
160 let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
161
162 let mut buffer = BytesMut::new();
163 while let Some(chunk_res) = stream.next().await {
164 let chunk = chunk_res?;
165 if buffer.len() + chunk.len() > max_size {
166 return Err(CamelError::StreamLimitExceeded(max_size));
167 }
168 buffer.extend_from_slice(&chunk);
169 }
170 Ok(buffer.freeze())
171 }
172 }
173 }
174
175 pub async fn materialize(self) -> Result<Bytes, CamelError> {
186 self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
187 }
188
189 pub fn as_text(&self) -> Option<&str> {
191 match self {
192 Body::Text(s) => Some(s.as_str()),
193 _ => None,
194 }
195 }
196
197 pub fn try_into_text(self) -> Result<Body, CamelError> {
201 crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
202 }
203
204 pub fn try_into_json(self) -> Result<Body, CamelError> {
208 crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
209 }
210
211 pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
215 crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
216 }
217}
218
219impl From<String> for Body {
221 fn from(s: String) -> Self {
222 Body::Text(s)
223 }
224}
225
226impl From<&str> for Body {
227 fn from(s: &str) -> Self {
228 Body::Text(s.to_string())
229 }
230}
231
232impl From<Bytes> for Body {
233 fn from(b: Bytes) -> Self {
234 Body::Bytes(b)
235 }
236}
237
238impl From<Vec<u8>> for Body {
239 fn from(v: Vec<u8>) -> Self {
240 Body::Bytes(Bytes::from(v))
241 }
242}
243
244impl From<serde_json::Value> for Body {
245 fn from(v: serde_json::Value) -> Self {
246 Body::Json(v)
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253
254 #[test]
255 fn test_body_default_is_empty() {
256 let body = Body::default();
257 assert!(body.is_empty());
258 }
259
260 #[test]
261 fn test_body_from_string() {
262 let body = Body::from("hello".to_string());
263 assert_eq!(body.as_text(), Some("hello"));
264 }
265
266 #[test]
267 fn test_body_from_str() {
268 let body = Body::from("world");
269 assert_eq!(body.as_text(), Some("world"));
270 }
271
272 #[test]
273 fn test_body_from_bytes() {
274 let body = Body::from(Bytes::from_static(b"data"));
275 assert!(!body.is_empty());
276 assert!(matches!(body, Body::Bytes(_)));
277 }
278
279 #[test]
280 fn test_body_from_json() {
281 let val = serde_json::json!({"key": "value"});
282 let body = Body::from(val.clone());
283 assert!(matches!(body, Body::Json(_)));
284 }
285
286 #[tokio::test]
287 async fn test_into_bytes_from_stream() {
288 use futures::stream;
289 let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
290 let stream = stream::iter(chunks);
291 let body = Body::Stream(StreamBody {
292 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
293 metadata: StreamMetadata::default(),
294 });
295
296 let result = body.into_bytes(100).await.unwrap();
297 assert_eq!(result, Bytes::from("hello world"));
298 }
299
300 #[tokio::test]
301 async fn test_into_bytes_limit_exceeded() {
302 use futures::stream;
303 let chunks = vec![Ok(Bytes::from("this is too long"))];
304 let stream = stream::iter(chunks);
305 let body = Body::Stream(StreamBody {
306 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
307 metadata: StreamMetadata::default(),
308 });
309
310 let result = body.into_bytes(5).await;
311 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
312 }
313
314 #[tokio::test]
315 async fn test_into_bytes_already_consumed() {
316 use futures::stream;
317 let chunks = vec![Ok(Bytes::from("data"))];
318 let stream = stream::iter(chunks);
319 let body = Body::Stream(StreamBody {
320 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
321 metadata: StreamMetadata::default(),
322 });
323
324 let cloned = body.clone();
325 let _ = body.into_bytes(100).await.unwrap();
326
327 let result = cloned.into_bytes(100).await;
328 assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
329 }
330
331 #[tokio::test]
332 async fn test_materialize_with_default_limit() {
333 use futures::stream;
334
335 let chunks = vec![Ok(Bytes::from("test data"))];
337 let stream = stream::iter(chunks);
338 let body = Body::Stream(StreamBody {
339 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
340 metadata: StreamMetadata::default(),
341 });
342
343 let result = body.materialize().await;
344 assert!(result.is_ok());
345 assert_eq!(result.unwrap(), Bytes::from("test data"));
346 }
347
348 #[tokio::test]
349 async fn test_materialize_non_stream_body_types() {
350 let body = Body::Empty;
354 let result = body.materialize().await.unwrap();
355 assert!(result.is_empty());
356
357 let body = Body::Bytes(Bytes::from("bytes data"));
359 let result = body.materialize().await.unwrap();
360 assert_eq!(result, Bytes::from("bytes data"));
361
362 let body = Body::Text("text data".to_string());
364 let result = body.materialize().await.unwrap();
365 assert_eq!(result, Bytes::from("text data"));
366
367 let body = Body::Json(serde_json::json!({"key": "value"}));
369 let result = body.materialize().await.unwrap();
370 assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
371 }
372
373 #[tokio::test]
374 async fn test_materialize_exceeds_default_limit() {
375 use futures::stream;
376
377 let large_data = vec![0u8; 15 * 1024 * 1024];
379 let chunks = vec![Ok(Bytes::from(large_data))];
380 let stream = stream::iter(chunks);
381 let body = Body::Stream(StreamBody {
382 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
383 metadata: StreamMetadata::default(),
384 });
385
386 let result = body.materialize().await;
387 assert!(matches!(
388 result,
389 Err(CamelError::StreamLimitExceeded(10_485_760))
390 ));
391 }
392
393 #[test]
394 fn stream_variants_are_never_equal() {
395 use futures::stream;
396
397 let make_stream = || {
398 let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
399 Body::Stream(StreamBody {
400 stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
401 metadata: StreamMetadata::default(),
402 })
403 };
404 assert_ne!(make_stream(), make_stream());
405 }
406}