use crate::error::CamelError;
use bytes::{Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context as TaskContext, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio::sync::Mutex;
use tokio_util::io::StreamReader;
const DEFAULT_MATERIALIZE_LIMIT: usize = 10 * 1024 * 1024;
pub type BoxAsyncRead = Pin<Box<dyn AsyncRead + Send + Unpin>>;
#[derive(Debug, Clone, Default)]
pub struct StreamMetadata {
pub size_hint: Option<u64>,
pub content_type: Option<String>,
pub origin: Option<String>,
}
pub struct StreamBody {
#[allow(clippy::type_complexity)]
pub stream: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
pub metadata: StreamMetadata,
}
impl std::fmt::Debug for StreamBody {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamBody")
.field("metadata", &self.metadata)
.field("stream", &"<BoxStream>")
.finish()
}
}
impl Clone for StreamBody {
fn clone(&self) -> Self {
Self {
stream: Arc::clone(&self.stream),
metadata: self.metadata.clone(),
}
}
}
#[allow(clippy::type_complexity)]
struct StreamAsyncRead {
arc: Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>,
reader: Option<Box<dyn AsyncRead + Send + Unpin>>,
consumed: bool,
}
impl AsyncRead for StreamAsyncRead {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.consumed {
return Poll::Ready(Err(io::Error::other("stream already consumed")));
}
if self.reader.is_none() {
let extracted = {
match self.arc.try_lock() {
Ok(mut guard) => guard.take(),
Err(_) => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
};
match extracted {
Some(stream) => {
let mapped = stream.map_err(|e: CamelError| io::Error::other(e.to_string()));
self.reader = Some(Box::new(StreamReader::new(mapped)));
}
None => {
self.consumed = true;
return Poll::Ready(Err(io::Error::other("stream already consumed")));
}
}
}
Pin::new(self.reader.as_mut().unwrap()).poll_read(cx, buf)
}
}
#[derive(Debug, Default)]
pub enum Body {
#[default]
Empty,
Bytes(Bytes),
Text(String),
Json(serde_json::Value),
Xml(String),
Stream(StreamBody),
}
impl Clone for Body {
fn clone(&self) -> Self {
match self {
Body::Empty => Body::Empty,
Body::Bytes(b) => Body::Bytes(b.clone()),
Body::Text(s) => Body::Text(s.clone()),
Body::Json(v) => Body::Json(v.clone()),
Body::Xml(s) => Body::Xml(s.clone()),
Body::Stream(s) => Body::Stream(s.clone()),
}
}
}
impl PartialEq for Body {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Body::Empty, Body::Empty) => true,
(Body::Text(a), Body::Text(b)) => a == b,
(Body::Json(a), Body::Json(b)) => a == b,
(Body::Bytes(a), Body::Bytes(b)) => a == b,
(Body::Xml(a), Body::Xml(b)) => a == b,
_ => false,
}
}
}
impl Body {
pub fn is_empty(&self) -> bool {
matches!(self, Body::Empty)
}
pub async fn into_bytes(self, max_size: usize) -> Result<Bytes, CamelError> {
match self {
Body::Empty => Ok(Bytes::new()),
Body::Bytes(b) => {
if b.len() > max_size {
return Err(CamelError::StreamLimitExceeded(max_size));
}
Ok(b)
}
Body::Text(s) => {
if s.len() > max_size {
return Err(CamelError::StreamLimitExceeded(max_size));
}
Ok(Bytes::from(s))
}
Body::Json(v) => {
let b = serde_json::to_vec(&v)
.map_err(|e| CamelError::TypeConversionFailed(e.to_string()))?;
if b.len() > max_size {
return Err(CamelError::StreamLimitExceeded(max_size));
}
Ok(Bytes::from(b))
}
Body::Xml(s) => {
if s.len() > max_size {
return Err(CamelError::StreamLimitExceeded(max_size));
}
Ok(Bytes::from(s))
}
Body::Stream(s) => {
let mut stream_lock = s.stream.lock().await;
let mut stream = stream_lock.take().ok_or(CamelError::AlreadyConsumed)?;
let mut buffer = BytesMut::new();
while let Some(chunk_res) = stream.next().await {
let chunk = chunk_res?;
if buffer.len() + chunk.len() > max_size {
return Err(CamelError::StreamLimitExceeded(max_size));
}
buffer.extend_from_slice(&chunk);
}
Ok(buffer.freeze())
}
}
}
pub async fn materialize(self) -> Result<Bytes, CamelError> {
self.into_bytes(DEFAULT_MATERIALIZE_LIMIT).await
}
pub fn into_async_read(self) -> BoxAsyncRead {
match self {
Body::Empty => Box::pin(tokio::io::empty()),
Body::Bytes(b) => Box::pin(std::io::Cursor::new(b)),
Body::Text(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
Body::Json(v) => match serde_json::to_vec(&v) {
Ok(bytes) => Box::pin(std::io::Cursor::new(bytes)) as BoxAsyncRead,
Err(e) => {
let err = io::Error::new(io::ErrorKind::InvalidData, e.to_string());
let stream = futures::stream::iter(vec![Err::<Bytes, io::Error>(err)]);
Box::pin(StreamReader::new(stream)) as BoxAsyncRead
}
},
Body::Xml(s) => Box::pin(std::io::Cursor::new(s.into_bytes())),
Body::Stream(s) => Box::pin(StreamAsyncRead {
arc: s.stream,
reader: None,
consumed: false,
}),
}
}
pub fn as_text(&self) -> Option<&str> {
match self {
Body::Text(s) => Some(s.as_str()),
_ => None,
}
}
pub fn as_xml(&self) -> Option<&str> {
match self {
Body::Xml(s) => Some(s.as_str()),
_ => None,
}
}
pub fn try_into_text(self) -> Result<Body, CamelError> {
crate::body_converter::convert(self, crate::body_converter::BodyType::Text)
}
pub fn try_into_json(self) -> Result<Body, CamelError> {
crate::body_converter::convert(self, crate::body_converter::BodyType::Json)
}
pub fn try_into_bytes_body(self) -> Result<Body, CamelError> {
crate::body_converter::convert(self, crate::body_converter::BodyType::Bytes)
}
pub fn try_into_xml(self) -> Result<Body, CamelError> {
crate::body_converter::convert(self, crate::body_converter::BodyType::Xml)
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
Body::Text(s)
}
}
impl From<&str> for Body {
fn from(s: &str) -> Self {
Body::Text(s.to_string())
}
}
impl From<Bytes> for Body {
fn from(b: Bytes) -> Self {
Body::Bytes(b)
}
}
impl From<Vec<u8>> for Body {
fn from(v: Vec<u8>) -> Self {
Body::Bytes(Bytes::from(v))
}
}
impl From<serde_json::Value> for Body {
fn from(v: serde_json::Value) -> Self {
Body::Json(v)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_body_default_is_empty() {
let body = Body::default();
assert!(body.is_empty());
}
#[test]
fn test_body_from_string() {
let body = Body::from("hello".to_string());
assert_eq!(body.as_text(), Some("hello"));
}
#[test]
fn test_body_from_str() {
let body = Body::from("world");
assert_eq!(body.as_text(), Some("world"));
}
#[test]
fn test_body_from_bytes() {
let body = Body::from(Bytes::from_static(b"data"));
assert!(!body.is_empty());
assert!(matches!(body, Body::Bytes(_)));
}
#[test]
fn test_body_from_json() {
let val = serde_json::json!({"key": "value"});
let body = Body::from(val.clone());
assert!(matches!(body, Body::Json(_)));
}
#[tokio::test]
async fn test_into_bytes_from_stream() {
use futures::stream;
let chunks = vec![Ok(Bytes::from("hello ")), Ok(Bytes::from("world"))];
let stream = stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata::default(),
});
let result = body.into_bytes(100).await.unwrap();
assert_eq!(result, Bytes::from("hello world"));
}
#[tokio::test]
async fn test_into_bytes_limit_exceeded() {
use futures::stream;
let chunks = vec![Ok(Bytes::from("this is too long"))];
let stream = stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata::default(),
});
let result = body.into_bytes(5).await;
assert!(matches!(result, Err(CamelError::StreamLimitExceeded(5))));
}
#[tokio::test]
async fn test_into_bytes_already_consumed() {
use futures::stream;
let chunks = vec![Ok(Bytes::from("data"))];
let stream = stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata::default(),
});
let cloned = body.clone();
let _ = body.into_bytes(100).await.unwrap();
let result = cloned.into_bytes(100).await;
assert!(matches!(result, Err(CamelError::AlreadyConsumed)));
}
#[tokio::test]
async fn test_materialize_with_default_limit() {
use futures::stream;
let chunks = vec![Ok(Bytes::from("test data"))];
let stream = stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata::default(),
});
let result = body.materialize().await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Bytes::from("test data"));
}
#[tokio::test]
async fn test_materialize_non_stream_body_types() {
let body = Body::Empty;
let result = body.materialize().await.unwrap();
assert!(result.is_empty());
let body = Body::Bytes(Bytes::from("bytes data"));
let result = body.materialize().await.unwrap();
assert_eq!(result, Bytes::from("bytes data"));
let body = Body::Text("text data".to_string());
let result = body.materialize().await.unwrap();
assert_eq!(result, Bytes::from("text data"));
let body = Body::Json(serde_json::json!({"key": "value"}));
let result = body.materialize().await.unwrap();
assert_eq!(result, Bytes::from_static(br#"{"key":"value"}"#));
let xml = "<root><child>value</child></root>";
let body = Body::Xml(xml.to_string());
let result = body.materialize().await.unwrap();
assert_eq!(result, Bytes::from(xml));
}
#[tokio::test]
async fn test_materialize_exceeds_default_limit() {
use futures::stream;
let large_data = vec![0u8; 15 * 1024 * 1024];
let chunks = vec![Ok(Bytes::from(large_data))];
let stream = stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata::default(),
});
let result = body.materialize().await;
assert!(matches!(
result,
Err(CamelError::StreamLimitExceeded(10_485_760))
));
}
#[test]
fn stream_variants_are_never_equal() {
use futures::stream;
let make_stream = || {
let s = stream::iter(vec![Ok(Bytes::from_static(b"data"))]);
Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(s)))),
metadata: StreamMetadata::default(),
})
};
assert_ne!(make_stream(), make_stream());
}
#[test]
fn test_body_xml_as_xml() {
let xml = "<root><child>value</child></root>";
let body = Body::Xml(xml.to_string());
assert_eq!(body.as_xml(), Some(xml));
}
#[test]
fn test_body_non_xml_as_xml_returns_none() {
let body = Body::Text("<root/>".to_string());
assert_eq!(body.as_xml(), None);
let body = Body::Empty;
assert_eq!(body.as_xml(), None);
let body = Body::Bytes(Bytes::from("<root/>"));
assert_eq!(body.as_xml(), None);
let body = Body::Json(serde_json::json!({"key": "value"}));
assert_eq!(body.as_xml(), None);
}
#[test]
fn test_body_xml_partial_eq() {
let body1 = Body::Xml("a".to_string());
let body2 = Body::Xml("a".to_string());
assert_eq!(body1, body2);
let body1 = Body::Xml("a".to_string());
let body2 = Body::Xml("b".to_string());
assert_ne!(body1, body2);
}
#[test]
fn test_body_xml_not_equal_to_other_variants() {
let xml_body = Body::Xml("x".to_string());
let text_body = Body::Text("x".to_string());
assert_ne!(xml_body, text_body);
}
#[test]
fn test_try_into_xml_from_text() {
let body = Body::Text("<root/>".to_string());
let result = body.try_into_xml();
assert!(matches!(result, Ok(Body::Xml(ref s)) if s == "<root/>"));
}
#[test]
fn test_try_into_xml_invalid_text() {
let body = Body::Text("not xml".to_string());
let result = body.try_into_xml();
assert!(matches!(result, Err(CamelError::TypeConversionFailed(_))));
}
#[test]
fn test_body_xml_clone() {
let original = Body::Xml("hello".to_string());
let cloned = original.clone();
assert_eq!(original, cloned);
}
#[tokio::test]
async fn test_into_async_read_empty() {
use tokio::io::AsyncReadExt;
let body = Body::Empty;
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert!(buf.is_empty());
}
#[tokio::test]
async fn test_into_async_read_bytes() {
use tokio::io::AsyncReadExt;
let body = Body::Bytes(Bytes::from("hello"));
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
}
#[tokio::test]
async fn test_into_async_read_text() {
use tokio::io::AsyncReadExt;
let body = Body::Text("world".to_string());
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"world");
}
#[tokio::test]
async fn test_into_async_read_json() {
use tokio::io::AsyncReadExt;
let body = Body::Json(serde_json::json!({"key": "val"}));
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&buf).unwrap();
assert_eq!(parsed["key"], "val");
}
#[tokio::test]
async fn test_into_async_read_xml() {
use tokio::io::AsyncReadExt;
let body = Body::Xml("<root/>".to_string());
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"<root/>");
}
#[tokio::test]
async fn test_into_async_read_stream_multichunk() {
use tokio::io::AsyncReadExt;
let chunks: Vec<Result<Bytes, CamelError>> = vec![
Ok(Bytes::from("foo")),
Ok(Bytes::from("bar")),
Ok(Bytes::from("baz")),
];
let stream = futures::stream::iter(chunks);
let body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
metadata: StreamMetadata {
size_hint: None,
content_type: None,
origin: None,
},
});
let mut reader = body.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"foobarbaz");
}
#[tokio::test]
async fn test_into_async_read_already_consumed() {
use tokio::io::AsyncReadExt;
type MaybeStream = Arc<Mutex<Option<BoxStream<'static, Result<Bytes, CamelError>>>>>;
let arc: MaybeStream = Arc::new(Mutex::new(None));
let body = Body::Stream(StreamBody {
stream: arc,
metadata: StreamMetadata {
size_hint: None,
content_type: None,
origin: None,
},
});
let mut reader = body.into_async_read();
let mut buf = Vec::new();
let result = reader.read_to_end(&mut buf).await;
assert!(result.is_err());
}
}