pub mod config;
pub use config::HttpConfig;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, OnceLock};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::{OnceCell, RwLock};
use tower::Service;
use tracing::debug;
use axum::body::BodyDataStream;
use camel_component_api::{Body, BoxProcessor, CamelError, Exchange, StreamBody, StreamMetadata};
use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
use camel_component_api::{UriComponents, UriConfig, parse_uri};
use futures::TryStreamExt;
use futures::stream::BoxStream;
#[derive(Debug, Clone)]
pub struct HttpEndpointConfig {
pub base_url: String,
pub http_method: Option<String>,
pub throw_exception_on_failure: bool,
pub ok_status_code_range: (u16, u16),
pub response_timeout: Option<Duration>,
pub query_params: HashMap<String, String>,
pub allow_private_ips: bool,
pub blocked_hosts: Vec<String>,
pub max_body_size: usize,
}
const HTTP_CAMEL_OPTIONS: &[&str] = &[
"httpMethod",
"throwExceptionOnFailure",
"okStatusCodeRange",
"followRedirects",
"connectTimeout",
"responseTimeout",
"allowPrivateIps",
"blockedHosts",
"maxBodySize",
];
impl UriConfig for HttpEndpointConfig {
fn scheme() -> &'static str {
"http"
}
fn from_uri(uri: &str) -> Result<Self, CamelError> {
let parts = parse_uri(uri)?;
Self::from_components(parts)
}
fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
if parts.scheme != "http" && parts.scheme != "https" {
return Err(CamelError::InvalidUri(format!(
"expected scheme 'http' or 'https', got '{}'",
parts.scheme
)));
}
let base_url = format!("{}:{}", parts.scheme, parts.path);
let http_method = parts.params.get("httpMethod").cloned();
let throw_exception_on_failure = parts
.params
.get("throwExceptionOnFailure")
.map(|v| v != "false")
.unwrap_or(true);
let ok_status_code_range = parts
.params
.get("okStatusCodeRange")
.and_then(|v| {
let (start, end) = v.split_once('-')?;
Some((start.parse::<u16>().ok()?, end.parse::<u16>().ok()?))
})
.unwrap_or((200, 299));
let response_timeout = parts
.params
.get("responseTimeout")
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_millis);
let allow_private_ips = parts
.params
.get("allowPrivateIps")
.map(|v| v == "true")
.unwrap_or(false);
let blocked_hosts = parts
.params
.get("blockedHosts")
.map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
.unwrap_or_default();
let max_body_size = parts
.params
.get("maxBodySize")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(10 * 1024 * 1024);
let query_params: HashMap<String, String> = parts
.params
.into_iter()
.filter(|(k, _)| !HTTP_CAMEL_OPTIONS.contains(&k.as_str()))
.collect();
Ok(Self {
base_url,
http_method,
throw_exception_on_failure,
ok_status_code_range,
response_timeout,
query_params,
allow_private_ips,
blocked_hosts,
max_body_size,
})
}
}
impl HttpEndpointConfig {
pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
let parts = parse_uri(uri)?;
let mut endpoint = Self::from_components(parts.clone())?;
if endpoint.response_timeout.is_none() {
endpoint.response_timeout = Some(Duration::from_millis(config.response_timeout_ms));
}
if !parts.params.contains_key("allowPrivateIps") {
endpoint.allow_private_ips = config.allow_private_ips;
}
if !parts.params.contains_key("blockedHosts") {
endpoint.blocked_hosts = config.blocked_hosts.clone();
}
if !parts.params.contains_key("maxBodySize") {
endpoint.max_body_size = config.max_body_size;
}
Ok(endpoint)
}
}
#[derive(Debug, Clone)]
pub struct HttpServerConfig {
pub host: String,
pub port: u16,
pub path: String,
pub max_request_body: usize,
pub max_response_body: usize,
}
impl UriConfig for HttpServerConfig {
fn scheme() -> &'static str {
"http"
}
fn from_uri(uri: &str) -> Result<Self, CamelError> {
let parts = parse_uri(uri)?;
Self::from_components(parts)
}
fn from_components(parts: UriComponents) -> Result<Self, CamelError> {
if parts.scheme != "http" && parts.scheme != "https" {
return Err(CamelError::InvalidUri(format!(
"expected scheme 'http' or 'https', got '{}'",
parts.scheme
)));
}
let authority_and_path = parts.path.trim_start_matches('/');
let (authority, path_suffix) = if let Some(idx) = authority_and_path.find('/') {
(&authority_and_path[..idx], &authority_and_path[idx..])
} else {
(authority_and_path, "/")
};
let path = if path_suffix.is_empty() {
"/"
} else {
path_suffix
}
.to_string();
let (host, port) = if let Some(colon) = authority.rfind(':') {
let port_str = &authority[colon + 1..];
match port_str.parse::<u16>() {
Ok(p) => (authority[..colon].to_string(), p),
Err(_) => {
return Err(CamelError::InvalidUri(format!(
"invalid port '{}' in authority",
port_str
)));
}
}
} else {
let default_port = if parts.scheme == "https" { 443 } else { 80 };
(authority.to_string(), default_port)
};
let max_request_body = parts
.params
.get("maxRequestBody")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(2 * 1024 * 1024);
let max_response_body = parts
.params
.get("maxResponseBody")
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(10 * 1024 * 1024);
Ok(Self {
host,
port,
path,
max_request_body,
max_response_body,
})
}
}
impl HttpServerConfig {
pub fn from_uri_with_defaults(uri: &str, config: &HttpConfig) -> Result<Self, CamelError> {
let parts = parse_uri(uri)?;
let mut server = Self::from_components(parts.clone())?;
if !parts.params.contains_key("maxRequestBody") {
server.max_request_body = config.max_request_body;
}
if !parts.params.contains_key("maxResponseBody") {
server.max_response_body = config.max_body_size;
}
Ok(server)
}
}
pub(crate) enum HttpReplyBody {
Bytes(bytes::Bytes),
Stream(BoxStream<'static, Result<bytes::Bytes, CamelError>>),
}
pub(crate) struct RequestEnvelope {
pub(crate) method: String,
pub(crate) path: String,
pub(crate) query: String,
pub(crate) headers: http::HeaderMap,
pub(crate) body: StreamBody,
pub(crate) reply_tx: tokio::sync::oneshot::Sender<HttpReply>,
}
pub(crate) struct HttpReply {
pub(crate) status: u16,
pub(crate) headers: Vec<(String, String)>,
pub(crate) body: HttpReplyBody,
}
pub(crate) type DispatchTable =
Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<RequestEnvelope>>>>;
struct ServerHandle {
dispatch: DispatchTable,
_task: tokio::task::JoinHandle<()>,
}
pub struct ServerRegistry {
inner: Mutex<HashMap<u16, Arc<OnceCell<ServerHandle>>>>,
}
impl ServerRegistry {
pub fn global() -> &'static Self {
static INSTANCE: OnceLock<ServerRegistry> = OnceLock::new();
INSTANCE.get_or_init(|| ServerRegistry {
inner: Mutex::new(HashMap::new()),
})
}
pub(crate) async fn get_or_spawn(
&'static self,
host: &str,
port: u16,
max_request_body: usize,
) -> Result<DispatchTable, CamelError> {
let host_owned = host.to_string();
let cell = {
let mut guard = self.inner.lock().map_err(|_| {
CamelError::EndpointCreationFailed("ServerRegistry lock poisoned".into())
})?;
guard
.entry(port)
.or_insert_with(|| Arc::new(OnceCell::new()))
.clone()
};
let handle = cell
.get_or_try_init(|| async {
let addr = format!("{host_owned}:{port}");
let listener = tokio::net::TcpListener::bind(&addr).await.map_err(|e| {
CamelError::EndpointCreationFailed(format!("Failed to bind {addr}: {e}"))
})?;
let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
let task = tokio::spawn(run_axum_server(
listener,
Arc::clone(&dispatch),
max_request_body,
));
Ok::<ServerHandle, CamelError>(ServerHandle {
dispatch,
_task: task,
})
})
.await?;
Ok(Arc::clone(&handle.dispatch))
}
}
use axum::{
Router,
body::Body as AxumBody,
extract::{Request, State},
http::{Response, StatusCode},
response::IntoResponse,
};
#[derive(Clone)]
struct AppState {
dispatch: DispatchTable,
max_request_body: usize,
}
async fn run_axum_server(
listener: tokio::net::TcpListener,
dispatch: DispatchTable,
max_request_body: usize,
) {
let state = AppState {
dispatch,
max_request_body,
};
let app = Router::new().fallback(dispatch_handler).with_state(state);
axum::serve(listener, app).await.unwrap_or_else(|e| {
tracing::error!(error = %e, "Axum server error");
});
}
async fn dispatch_handler(State(state): State<AppState>, req: Request) -> impl IntoResponse {
let method = req.method().to_string();
let path = req.uri().path().to_string();
let query = req.uri().query().unwrap_or("").to_string();
let headers = req.headers().clone();
let content_length: Option<u64> = headers
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok());
if let Some(len) = content_length
&& len > state.max_request_body as u64
{
return Response::builder()
.status(StatusCode::PAYLOAD_TOO_LARGE)
.body(AxumBody::from("Request body exceeds configured limit"))
.expect("infallible");
}
let content_type = headers
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let data_stream: BodyDataStream = req.into_body().into_data_stream();
let mapped_stream = data_stream.map_err(|e| CamelError::Io(e.to_string()));
let boxed: BoxStream<'static, Result<bytes::Bytes, CamelError>> = Box::pin(mapped_stream);
let stream_body = StreamBody {
stream: Arc::new(tokio::sync::Mutex::new(Some(boxed))),
metadata: StreamMetadata {
size_hint: content_length,
content_type,
origin: None,
},
};
let sender = {
let table = state.dispatch.read().await;
table.get(&path).cloned()
};
let Some(sender) = sender else {
return Response::builder()
.status(StatusCode::NOT_FOUND)
.body(AxumBody::from("No consumer registered for this path"))
.expect("infallible");
};
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<HttpReply>();
let envelope = RequestEnvelope {
method,
path,
query,
headers,
body: stream_body,
reply_tx,
};
if sender.send(envelope).await.is_err() {
return Response::builder()
.status(StatusCode::SERVICE_UNAVAILABLE)
.body(AxumBody::from("Consumer unavailable"))
.expect("infallible");
}
match reply_rx.await {
Ok(reply) => {
let status =
StatusCode::from_u16(reply.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut builder = Response::builder().status(status);
for (k, v) in &reply.headers {
builder = builder.header(k.as_str(), v.as_str());
}
match reply.body {
HttpReplyBody::Bytes(b) => builder.body(AxumBody::from(b)).unwrap_or_else(|_| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(AxumBody::from("Invalid response headers from consumer"))
.expect("infallible")
}),
HttpReplyBody::Stream(stream) => builder
.body(AxumBody::from_stream(stream))
.unwrap_or_else(|_| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(AxumBody::from("Invalid response headers from consumer"))
.expect("infallible")
}),
}
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(AxumBody::from("Pipeline error"))
.expect("Response::builder() with a known-valid status code and body is infallible"),
}
}
pub struct HttpConsumer {
config: HttpServerConfig,
}
impl HttpConsumer {
pub fn new(config: HttpServerConfig) -> Self {
Self { config }
}
}
#[async_trait::async_trait]
impl Consumer for HttpConsumer {
async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
use camel_component_api::{Body, Exchange, Message};
let dispatch = ServerRegistry::global()
.get_or_spawn(
&self.config.host,
self.config.port,
self.config.max_request_body,
)
.await?;
let (env_tx, mut env_rx) = tokio::sync::mpsc::channel::<RequestEnvelope>(64);
{
let mut table = dispatch.write().await;
table.insert(self.config.path.clone(), env_tx);
}
let path = self.config.path.clone();
let cancel_token = ctx.cancel_token();
let _max_response_body = self.config.max_response_body;
loop {
tokio::select! {
_ = ctx.cancelled() => {
break;
}
envelope = env_rx.recv() => {
let Some(envelope) = envelope else { break; };
let mut msg = Message::default();
msg.set_header("CamelHttpMethod",
serde_json::Value::String(envelope.method.clone()));
msg.set_header("CamelHttpPath",
serde_json::Value::String(envelope.path.clone()));
msg.set_header("CamelHttpQuery",
serde_json::Value::String(envelope.query.clone()));
for (k, v) in &envelope.headers {
if let Ok(val_str) = v.to_str() {
msg.set_header(
k.as_str(),
serde_json::Value::String(val_str.to_string()),
);
}
}
msg.body = Body::Stream(envelope.body);
#[allow(unused_mut)]
let mut exchange = Exchange::new(msg);
#[cfg(feature = "otel")]
{
let headers: HashMap<String, String> = envelope
.headers
.iter()
.filter_map(|(k, v)| {
Some((k.as_str().to_lowercase(), v.to_str().ok()?.to_string()))
})
.collect();
camel_otel::extract_into_exchange(&mut exchange, &headers);
}
let reply_tx = envelope.reply_tx;
let sender = ctx.sender().clone();
let path_clone = path.clone();
let cancel = cancel_token.clone();
tokio::spawn(async move {
if cancel.is_cancelled() {
let _ = reply_tx.send(HttpReply {
status: 503,
headers: vec![],
body: HttpReplyBody::Bytes(bytes::Bytes::from("Service Unavailable")),
});
return;
}
let (tx, rx) = tokio::sync::oneshot::channel();
let envelope = camel_component_api::consumer::ExchangeEnvelope {
exchange,
reply_tx: Some(tx),
};
let result = match sender.send(envelope).await {
Ok(()) => rx.await.map_err(|_| camel_component_api::CamelError::ChannelClosed),
Err(_) => Err(camel_component_api::CamelError::ChannelClosed),
}
.and_then(|r| r);
let reply = match result {
Ok(out) => {
let status = out
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.map(|s| s as u16)
.unwrap_or(200);
let reply_body: HttpReplyBody = match out.input.body {
Body::Empty => HttpReplyBody::Bytes(bytes::Bytes::new()),
Body::Bytes(b) => HttpReplyBody::Bytes(b),
Body::Text(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
Body::Xml(s) => HttpReplyBody::Bytes(bytes::Bytes::from(s.into_bytes())),
Body::Json(v) => HttpReplyBody::Bytes(bytes::Bytes::from(
v.to_string().into_bytes(),
)),
Body::Stream(s) => {
match s.stream.lock().await.take() {
Some(stream) => HttpReplyBody::Stream(stream),
None => {
tracing::error!(
"Body::Stream already consumed before HTTP reply — returning 500"
);
let error_reply = HttpReply {
status: 500,
headers: vec![],
body: HttpReplyBody::Bytes(bytes::Bytes::new()),
};
if reply_tx.send(error_reply).is_err() {
debug!("reply_tx dropped before error reply could be sent");
}
return;
}
}
}
};
let resp_headers: Vec<(String, String)> = out
.input
.headers
.iter()
.filter(|(k, _)| !k.starts_with("Camel"))
.filter(|(k, _)| {
!matches!(
k.to_lowercase().as_str(),
"content-length" | "content-type" | "transfer-encoding" | "connection" | "cache-control" | "date" | "pragma" | "trailer" | "upgrade" | "via" | "warning" | "host" | "user-agent" | "accept" | "accept-encoding" | "accept-language" | "accept-charset" | "authorization" | "proxy-authorization" | "cookie" | "expect" | "from" | "if-match" | "if-modified-since" | "if-none-match" | "if-range" | "if-unmodified-since" | "max-forwards" | "proxy-connection" | "range" | "referer" | "te" )
})
.filter_map(|(k, v)| {
v.as_str().map(|s| (k.clone(), s.to_string()))
})
.collect();
HttpReply {
status,
headers: resp_headers,
body: reply_body,
}
}
Err(e) => {
tracing::error!(error = %e, path = %path_clone, "Pipeline error processing HTTP request");
HttpReply {
status: 500,
headers: vec![],
body: HttpReplyBody::Bytes(bytes::Bytes::from("Internal Server Error")),
}
}
};
let _ = reply_tx.send(reply);
});
}
}
}
{
let mut table = dispatch.write().await;
table.remove(&path);
}
Ok(())
}
async fn stop(&mut self) -> Result<(), CamelError> {
Ok(())
}
fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
camel_component_api::ConcurrencyModel::Concurrent { max: None }
}
}
pub struct HttpComponent {
client: reqwest::Client,
config: HttpConfig,
}
fn build_client(config: &HttpConfig) -> reqwest::Client {
let mut builder = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(config.connect_timeout_ms))
.pool_max_idle_per_host(config.pool_max_idle_per_host)
.pool_idle_timeout(Duration::from_millis(config.pool_idle_timeout_ms));
if !config.follow_redirects {
builder = builder.redirect(reqwest::redirect::Policy::none());
}
builder
.build()
.expect("reqwest::Client::build() with valid config should not fail")
}
impl HttpComponent {
pub fn new() -> Self {
let config = HttpConfig::default();
let client = build_client(&config);
Self { client, config }
}
pub fn with_config(config: HttpConfig) -> Self {
let client = build_client(&config);
Self { client, config }
}
pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
match config {
Some(cfg) => Self::with_config(cfg),
None => Self::new(),
}
}
}
impl Default for HttpComponent {
fn default() -> Self {
Self::new()
}
}
impl Component for HttpComponent {
fn scheme(&self) -> &str {
"http"
}
fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
Ok(Box::new(HttpEndpoint {
uri: uri.to_string(),
config,
server_config,
client: self.client.clone(),
}))
}
}
pub struct HttpsComponent {
client: reqwest::Client,
config: HttpConfig,
}
impl HttpsComponent {
pub fn new() -> Self {
let config = HttpConfig::default();
let client = build_client(&config);
Self { client, config }
}
pub fn with_config(config: HttpConfig) -> Self {
let client = build_client(&config);
Self { client, config }
}
pub fn with_optional_config(config: Option<HttpConfig>) -> Self {
match config {
Some(cfg) => Self::with_config(cfg),
None => Self::new(),
}
}
}
impl Default for HttpsComponent {
fn default() -> Self {
Self::new()
}
}
impl Component for HttpsComponent {
fn scheme(&self) -> &str {
"https"
}
fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
let config = HttpEndpointConfig::from_uri_with_defaults(uri, &self.config)?;
let server_config = HttpServerConfig::from_uri_with_defaults(uri, &self.config)?;
Ok(Box::new(HttpEndpoint {
uri: uri.to_string(),
config,
server_config,
client: self.client.clone(),
}))
}
}
struct HttpEndpoint {
uri: String,
config: HttpEndpointConfig,
server_config: HttpServerConfig,
client: reqwest::Client,
}
impl Endpoint for HttpEndpoint {
fn uri(&self) -> &str {
&self.uri
}
fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
Ok(Box::new(HttpConsumer::new(self.server_config.clone())))
}
fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
Ok(BoxProcessor::new(HttpProducer {
config: Arc::new(self.config.clone()),
client: self.client.clone(),
}))
}
}
fn validate_url_for_ssrf(url: &str, config: &HttpEndpointConfig) -> Result<(), CamelError> {
let parsed = url::Url::parse(url)
.map_err(|e| CamelError::ProcessorError(format!("Invalid URL: {}", e)))?;
if let Some(host) = parsed.host_str()
&& config.blocked_hosts.iter().any(|blocked| host == blocked)
{
return Err(CamelError::ProcessorError(format!(
"Host '{}' is blocked",
host
)));
}
if !config.allow_private_ips
&& let Some(host) = parsed.host()
{
match host {
url::Host::Ipv4(ip) => {
if ip.is_private() || ip.is_loopback() || ip.is_link_local() {
return Err(CamelError::ProcessorError(format!(
"Private IP '{}' not allowed (set allowPrivateIps=true to override)",
ip
)));
}
}
url::Host::Ipv6(ip) => {
if ip.is_loopback() {
return Err(CamelError::ProcessorError(format!(
"Loopback IP '{}' not allowed",
ip
)));
}
}
url::Host::Domain(domain) => {
let blocked_domains = ["localhost", "127.0.0.1", "0.0.0.0", "local"];
if blocked_domains.contains(&domain) {
return Err(CamelError::ProcessorError(format!(
"Domain '{}' is not allowed",
domain
)));
}
}
}
}
Ok(())
}
#[derive(Clone)]
struct HttpProducer {
config: Arc<HttpEndpointConfig>,
client: reqwest::Client,
}
impl HttpProducer {
fn resolve_method(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
if let Some(ref method) = config.http_method {
return method.to_uppercase();
}
if let Some(method) = exchange
.input
.header("CamelHttpMethod")
.and_then(|v| v.as_str())
{
return method.to_uppercase();
}
if !exchange.input.body.is_empty() {
return "POST".to_string();
}
"GET".to_string()
}
fn resolve_url(exchange: &Exchange, config: &HttpEndpointConfig) -> String {
if let Some(uri) = exchange
.input
.header("CamelHttpUri")
.and_then(|v| v.as_str())
{
let mut url = uri.to_string();
if let Some(path) = exchange
.input
.header("CamelHttpPath")
.and_then(|v| v.as_str())
{
if !url.ends_with('/') && !path.starts_with('/') {
url.push('/');
}
url.push_str(path);
}
if let Some(query) = exchange
.input
.header("CamelHttpQuery")
.and_then(|v| v.as_str())
{
url.push('?');
url.push_str(query);
}
return url;
}
let mut url = config.base_url.clone();
if let Some(path) = exchange
.input
.header("CamelHttpPath")
.and_then(|v| v.as_str())
{
if !url.ends_with('/') && !path.starts_with('/') {
url.push('/');
}
url.push_str(path);
}
if let Some(query) = exchange
.input
.header("CamelHttpQuery")
.and_then(|v| v.as_str())
{
url.push('?');
url.push_str(query);
} else if !config.query_params.is_empty() {
url.push('?');
let query_string: String = config
.query_params
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&");
url.push_str(&query_string);
}
url
}
fn is_ok_status(status: u16, range: (u16, u16)) -> bool {
status >= range.0 && status <= range.1
}
}
impl Service<Exchange> for HttpProducer {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, mut exchange: Exchange) -> Self::Future {
let config = self.config.clone();
let client = self.client.clone();
Box::pin(async move {
let method_str = HttpProducer::resolve_method(&exchange, &config);
let url = HttpProducer::resolve_url(&exchange, &config);
validate_url_for_ssrf(&url, &config)?;
debug!(
correlation_id = %exchange.correlation_id(),
method = %method_str,
url = %url,
"HTTP request"
);
let method = method_str.parse::<reqwest::Method>().map_err(|e| {
CamelError::ProcessorError(format!("Invalid HTTP method '{}': {}", method_str, e))
})?;
let mut request = client.request(method, &url);
if let Some(timeout) = config.response_timeout {
request = request.timeout(timeout);
}
#[cfg(feature = "otel")]
{
let mut otel_headers = HashMap::new();
camel_otel::inject_from_exchange(&exchange, &mut otel_headers);
for (k, v) in otel_headers {
if let (Ok(name), Ok(val)) = (
reqwest::header::HeaderName::from_bytes(k.as_bytes()),
reqwest::header::HeaderValue::from_str(&v),
) {
request = request.header(name, val);
}
}
}
for (key, value) in &exchange.input.headers {
if !key.starts_with("Camel")
&& let Some(val_str) = value.as_str()
&& let (Ok(name), Ok(val)) = (
reqwest::header::HeaderName::from_bytes(key.as_bytes()),
reqwest::header::HeaderValue::from_str(val_str),
)
{
request = request.header(name, val);
}
}
match exchange.input.body {
Body::Stream(ref s) => {
let mut stream_lock = s.stream.lock().await;
if let Some(stream) = stream_lock.take() {
request = request.body(reqwest::Body::wrap_stream(stream));
} else {
return Err(CamelError::AlreadyConsumed);
}
}
_ => {
let body = std::mem::take(&mut exchange.input.body);
let bytes = body.into_bytes(config.max_body_size).await?;
if !bytes.is_empty() {
request = request.body(bytes);
}
}
}
let response = request
.send()
.await
.map_err(|e| CamelError::ProcessorError(format!("HTTP request failed: {e}")))?;
let status_code = response.status().as_u16();
let status_text = response
.status()
.canonical_reason()
.unwrap_or("Unknown")
.to_string();
for (key, value) in response.headers() {
if let Ok(val_str) = value.to_str() {
exchange
.input
.set_header(key.as_str(), serde_json::Value::String(val_str.to_string()));
}
}
exchange.input.set_header(
"CamelHttpResponseCode",
serde_json::Value::Number(status_code.into()),
);
exchange.input.set_header(
"CamelHttpResponseText",
serde_json::Value::String(status_text.clone()),
);
let response_body = response.bytes().await.map_err(|e| {
CamelError::ProcessorError(format!("Failed to read response body: {e}"))
})?;
if config.throw_exception_on_failure
&& !HttpProducer::is_ok_status(status_code, config.ok_status_code_range)
{
return Err(CamelError::HttpOperationFailed {
method: method_str,
url,
status_code,
status_text,
response_body: Some(String::from_utf8_lossy(&response_body).to_string()),
});
}
if !response_body.is_empty() {
exchange.input.body = Body::Bytes(bytes::Bytes::from(response_body.to_vec()));
}
debug!(
correlation_id = %exchange.correlation_id(),
status = status_code,
url = %url,
"HTTP response"
);
Ok(exchange)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_component_api::Message;
use std::sync::Arc;
use std::time::Duration;
fn test_producer_ctx() -> ProducerContext {
ProducerContext::new()
}
#[test]
fn test_http_config_defaults() {
let config = HttpEndpointConfig::from_uri("http://localhost:8080/api").unwrap();
assert_eq!(config.base_url, "http://localhost:8080/api");
assert!(config.http_method.is_none());
assert!(config.throw_exception_on_failure);
assert_eq!(config.ok_status_code_range, (200, 299));
assert!(config.response_timeout.is_none());
}
#[test]
fn test_http_config_scheme() {
assert_eq!(HttpEndpointConfig::scheme(), "http");
}
#[test]
fn test_http_config_from_components() {
let components = camel_component_api::UriComponents {
scheme: "https".to_string(),
path: "//api.example.com/v1".to_string(),
params: std::collections::HashMap::from([(
"httpMethod".to_string(),
"POST".to_string(),
)]),
};
let config = HttpEndpointConfig::from_components(components).unwrap();
assert_eq!(config.base_url, "https://api.example.com/v1");
assert_eq!(config.http_method, Some("POST".to_string()));
}
#[test]
fn test_http_config_with_options() {
let config = HttpEndpointConfig::from_uri(
"https://api.example.com/v1?httpMethod=PUT&throwExceptionOnFailure=false&followRedirects=true&connectTimeout=5000&responseTimeout=10000"
).unwrap();
assert_eq!(config.base_url, "https://api.example.com/v1");
assert_eq!(config.http_method, Some("PUT".to_string()));
assert!(!config.throw_exception_on_failure);
assert_eq!(config.response_timeout, Some(Duration::from_millis(10000)));
}
#[test]
fn test_from_uri_with_defaults_applies_config_when_uri_param_absent() {
let config = HttpConfig::default()
.with_response_timeout_ms(999)
.with_allow_private_ips(true)
.with_blocked_hosts(vec!["evil.com".to_string()])
.with_max_body_size(12345);
let endpoint =
HttpEndpointConfig::from_uri_with_defaults("http://example.com/api", &config).unwrap();
assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(999)));
assert!(endpoint.allow_private_ips);
assert_eq!(endpoint.blocked_hosts, vec!["evil.com".to_string()]);
assert_eq!(endpoint.max_body_size, 12345);
}
#[test]
fn test_from_uri_with_defaults_uri_overrides_config() {
let config = HttpConfig::default()
.with_response_timeout_ms(999)
.with_allow_private_ips(true)
.with_blocked_hosts(vec!["evil.com".to_string()])
.with_max_body_size(12345);
let endpoint = HttpEndpointConfig::from_uri_with_defaults(
"http://example.com/api?responseTimeout=500&allowPrivateIps=false&blockedHosts=bad.net&maxBodySize=99",
&config,
)
.unwrap();
assert_eq!(endpoint.response_timeout, Some(Duration::from_millis(500)));
assert!(!endpoint.allow_private_ips);
assert_eq!(endpoint.blocked_hosts, vec!["bad.net".to_string()]);
assert_eq!(endpoint.max_body_size, 99);
}
#[test]
fn test_http_config_ok_status_range() {
let config =
HttpEndpointConfig::from_uri("http://localhost/api?okStatusCodeRange=200-204").unwrap();
assert_eq!(config.ok_status_code_range, (200, 204));
}
#[test]
fn test_http_config_wrong_scheme() {
let result = HttpEndpointConfig::from_uri("file:/tmp");
assert!(result.is_err());
}
#[test]
fn test_http_component_scheme() {
let component = HttpComponent::new();
assert_eq!(component.scheme(), "http");
}
#[test]
fn test_https_component_scheme() {
let component = HttpsComponent::new();
assert_eq!(component.scheme(), "https");
}
#[test]
fn test_http_endpoint_creates_consumer() {
let component = HttpComponent::new();
let endpoint = component
.create_endpoint("http://0.0.0.0:19100/test")
.unwrap();
assert!(endpoint.create_consumer().is_ok());
}
#[test]
fn test_https_endpoint_creates_consumer() {
let component = HttpsComponent::new();
let endpoint = component
.create_endpoint("https://0.0.0.0:8443/test")
.unwrap();
assert!(endpoint.create_consumer().is_ok());
}
#[test]
fn test_http_endpoint_creates_producer() {
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component.create_endpoint("http://localhost/api").unwrap();
assert!(endpoint.create_producer(&ctx).is_ok());
}
async fn start_test_server() -> (String, tokio::task::JoinHandle<()>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://127.0.0.1:{}", addr.port());
let handle = tokio::spawn(async move {
loop {
if let Ok((mut stream, _)) = listener.accept().await {
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = vec![0u8; 4096];
let n = stream.read(&mut buf).await.unwrap_or(0);
let request = String::from_utf8_lossy(&buf[..n]).to_string();
let method = request.split_whitespace().next().unwrap_or("GET");
let body = format!(r#"{{"method":"{}","echo":"ok"}}"#, method);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Custom: test-value\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
}
});
(url, handle)
}
async fn start_status_server(status: u16) -> (String, tokio::task::JoinHandle<()>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://127.0.0.1:{}", addr.port());
let handle = tokio::spawn(async move {
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let status = status;
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let status_text = match status {
404 => "Not Found",
500 => "Internal Server Error",
_ => "Error",
};
let body = "error body";
let response = format!(
"HTTP/1.1 {} {}\r\nContent-Length: {}\r\n\r\n{}",
status,
status_text,
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
}
});
(url, handle)
}
#[tokio::test]
async fn test_http_producer_get_request() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api/test?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
assert!(!result.input.body.is_empty());
}
#[tokio::test]
async fn test_http_producer_post_with_body() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api/data?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::new("request body"));
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_http_producer_method_from_header() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
exchange.input.set_header(
"CamelHttpMethod",
serde_json::Value::String("DELETE".to_string()),
);
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_http_producer_forced_method() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api?httpMethod=PUT&allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_http_producer_throw_exception_on_failure() {
use tower::ServiceExt;
let (url, _handle) = start_status_server(404).await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/not-found?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await;
assert!(result.is_err());
match result.unwrap_err() {
CamelError::HttpOperationFailed { status_code, .. } => {
assert_eq!(status_code, 404);
}
e => panic!("Expected HttpOperationFailed, got: {e}"),
}
}
#[tokio::test]
async fn test_http_producer_no_throw_on_failure() {
use tower::ServiceExt;
let (url, _handle) = start_status_server(500).await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!(
"{url}/error?throwExceptionOnFailure=false&allowPrivateIps=true"
))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 500);
}
#[tokio::test]
async fn test_http_producer_uri_override() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint("http://localhost:1/does-not-exist?allowPrivateIps=true")
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
exchange.input.set_header(
"CamelHttpUri",
serde_json::Value::String(format!("{url}/api")),
);
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_http_producer_response_headers_mapped() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
assert!(
result.input.header("content-type").is_some()
|| result.input.header("Content-Type").is_some()
);
assert!(result.input.header("CamelHttpResponseText").is_some());
}
async fn start_redirect_server() -> (String, tokio::task::JoinHandle<()>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://127.0.0.1:{}", addr.port());
let handle = tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
loop {
if let Ok((mut stream, _)) = listener.accept().await {
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
let n = stream.read(&mut buf).await.unwrap_or(0);
let request = String::from_utf8_lossy(&buf[..n]).to_string();
if request.contains("GET /final") {
let body = r#"{"status":"final"}"#;
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(response.as_bytes()).await;
} else {
let response = "HTTP/1.1 302 Found\r\nLocation: /final\r\nContent-Length: 0\r\n\r\n";
let _ = stream.write_all(response.as_bytes()).await;
}
});
}
}
});
(url, handle)
}
#[tokio::test]
async fn test_follow_redirects_false_does_not_follow() {
use tower::ServiceExt;
let (url, _handle) = start_redirect_server().await;
let ctx = test_producer_ctx();
let component =
HttpComponent::with_config(HttpConfig::default().with_follow_redirects(false));
let endpoint = component
.create_endpoint(&format!(
"{url}?throwExceptionOnFailure=false&allowPrivateIps=true"
))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(
status, 302,
"Should NOT follow redirect when followRedirects=false"
);
}
#[tokio::test]
async fn test_follow_redirects_true_follows_redirect() {
use tower::ServiceExt;
let (url, _handle) = start_redirect_server().await;
let ctx = test_producer_ctx();
let component =
HttpComponent::with_config(HttpConfig::default().with_follow_redirects(true));
let endpoint = component
.create_endpoint(&format!("{url}?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(
status, 200,
"Should follow redirect when followRedirects=true"
);
}
#[tokio::test]
async fn test_query_params_forwarded_to_http_request() {
use tower::ServiceExt;
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!(
"{url}/api?apiKey=secret123&httpMethod=GET&allowPrivateIps=true"
))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
#[tokio::test]
async fn test_non_camel_query_params_are_forwarded() {
let config = HttpEndpointConfig::from_uri(
"http://example.com/api?apiKey=secret123&httpMethod=GET&token=abc456",
)
.unwrap();
assert!(
config.query_params.contains_key("apiKey"),
"apiKey should be preserved"
);
assert!(
config.query_params.contains_key("token"),
"token should be preserved"
);
assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
assert_eq!(config.query_params.get("token").unwrap(), "abc456");
assert!(
!config.query_params.contains_key("httpMethod"),
"httpMethod should not be forwarded"
);
}
#[tokio::test]
async fn test_http_producer_blocks_metadata_endpoint() {
use tower::ServiceExt;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint("http://example.com/api?allowPrivateIps=false")
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
exchange.input.set_header(
"CamelHttpUri",
serde_json::Value::String("http://169.254.169.254/latest/meta-data/".to_string()),
);
let result = producer.oneshot(exchange).await;
assert!(result.is_err(), "Should block AWS metadata endpoint");
let err = result.unwrap_err();
assert!(
err.to_string().contains("Private IP"),
"Error should mention private IP blocking, got: {}",
err
);
}
#[test]
fn test_ssrf_config_defaults() {
let config = HttpEndpointConfig::from_uri("http://example.com/api").unwrap();
assert!(
!config.allow_private_ips,
"Private IPs should be blocked by default"
);
assert!(
config.blocked_hosts.is_empty(),
"Blocked hosts should be empty by default"
);
}
#[test]
fn test_ssrf_config_allow_private_ips() {
let config =
HttpEndpointConfig::from_uri("http://example.com/api?allowPrivateIps=true").unwrap();
assert!(
config.allow_private_ips,
"Private IPs should be allowed when explicitly set"
);
}
#[test]
fn test_ssrf_config_blocked_hosts() {
let config = HttpEndpointConfig::from_uri(
"http://example.com/api?blockedHosts=evil.com,malware.net",
)
.unwrap();
assert_eq!(config.blocked_hosts, vec!["evil.com", "malware.net"]);
}
#[tokio::test]
async fn test_http_producer_blocks_localhost() {
use tower::ServiceExt;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component.create_endpoint("http://example.com/api").unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
exchange.input.set_header(
"CamelHttpUri",
serde_json::Value::String("http://localhost:8080/internal".to_string()),
);
let result = producer.oneshot(exchange).await;
assert!(result.is_err(), "Should block localhost");
}
#[tokio::test]
async fn test_http_producer_blocks_loopback_ip() {
use tower::ServiceExt;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component.create_endpoint("http://example.com/api").unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
exchange.input.set_header(
"CamelHttpUri",
serde_json::Value::String("http://127.0.0.1:8080/internal".to_string()),
);
let result = producer.oneshot(exchange).await;
assert!(result.is_err(), "Should block loopback IP");
}
#[tokio::test]
async fn test_http_producer_allows_private_ip_when_enabled() {
use tower::ServiceExt;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint("http://192.168.1.1/api?allowPrivateIps=true")
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await;
if let Err(ref e) = result {
let err_str = e.to_string();
assert!(
!err_str.contains("Private IP") && !err_str.contains("not allowed"),
"Should not be SSRF error, got: {}",
err_str
);
}
}
#[test]
fn test_http_server_config_parse() {
let cfg = HttpServerConfig::from_uri("http://0.0.0.0:8080/orders").unwrap();
assert_eq!(cfg.host, "0.0.0.0");
assert_eq!(cfg.port, 8080);
assert_eq!(cfg.path, "/orders");
}
#[test]
fn test_http_server_config_scheme() {
assert_eq!(HttpServerConfig::scheme(), "http");
}
#[test]
fn test_http_server_config_from_components() {
let components = camel_component_api::UriComponents {
scheme: "https".to_string(),
path: "//0.0.0.0:8443/api".to_string(),
params: std::collections::HashMap::from([(
"maxRequestBody".to_string(),
"5242880".to_string(),
)]),
};
let cfg = HttpServerConfig::from_components(components).unwrap();
assert_eq!(cfg.host, "0.0.0.0");
assert_eq!(cfg.port, 8443);
assert_eq!(cfg.path, "/api");
assert_eq!(cfg.max_request_body, 5242880);
}
#[test]
fn test_http_server_config_default_path() {
let cfg = HttpServerConfig::from_uri("http://0.0.0.0:3000").unwrap();
assert_eq!(cfg.path, "/");
}
#[test]
fn test_http_server_config_wrong_scheme() {
assert!(HttpServerConfig::from_uri("file:/tmp").is_err());
}
#[test]
fn test_http_server_config_invalid_port() {
assert!(HttpServerConfig::from_uri("http://localhost:abc/path").is_err());
}
#[test]
fn test_http_server_config_default_port_by_scheme() {
let cfg_http = HttpServerConfig::from_uri("http://0.0.0.0/orders").unwrap();
assert_eq!(cfg_http.port, 80);
let cfg_https = HttpServerConfig::from_uri("https://0.0.0.0/orders").unwrap();
assert_eq!(cfg_https.port, 443);
}
#[test]
fn test_request_envelope_and_reply_are_send() {
fn assert_send<T: Send>() {}
assert_send::<RequestEnvelope>();
assert_send::<HttpReply>();
}
#[test]
fn test_server_registry_global_is_singleton() {
let r1 = ServerRegistry::global();
let r2 = ServerRegistry::global();
assert!(std::ptr::eq(r1 as *const _, r2 as *const _));
}
#[tokio::test]
async fn test_concurrent_get_or_spawn_returns_same_dispatch() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let results: Arc<std::sync::Mutex<Vec<DispatchTable>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = Vec::new();
for _ in 0..4 {
let results = results.clone();
handles.push(tokio::spawn(async move {
let dispatch = ServerRegistry::global()
.get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024)
.await
.unwrap();
results.lock().unwrap().push(dispatch);
}));
}
for h in handles {
h.await.unwrap();
}
let dispatches = results.lock().unwrap();
assert_eq!(dispatches.len(), 4);
for i in 1..dispatches.len() {
assert!(
Arc::ptr_eq(&dispatches[0], &dispatches[i]),
"all concurrent callers should get the same dispatch table"
);
}
}
#[tokio::test]
async fn test_dispatch_handler_returns_404_for_unknown_path() {
let dispatch: DispatchTable = Arc::new(RwLock::new(HashMap::new()));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tokio::spawn(run_axum_server(listener, dispatch, 2 * 1024 * 1024));
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let resp = reqwest::get(format!("http://127.0.0.1:{port}/unknown"))
.await
.unwrap();
assert_eq!(resp.status().as_u16(), 404);
}
#[tokio::test]
async fn test_http_consumer_start_registers_path() {
use camel_component_api::ConsumerContext;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let consumer_cfg = HttpServerConfig {
host: "127.0.0.1".to_string(),
port,
path: "/ping".to_string(),
max_request_body: 2 * 1024 * 1024,
max_response_body: 10 * 1024 * 1024,
};
let mut consumer = HttpConsumer::new(consumer_cfg);
let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move {
consumer.start(ctx).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let resp_future = client
.post(format!("http://127.0.0.1:{port}/ping"))
.body("hello world")
.send();
let (http_result, _) = tokio::join!(resp_future, async {
if let Some(mut envelope) = rx.recv().await {
envelope.exchange.input.set_header(
"CamelHttpResponseCode",
serde_json::Value::Number(201.into()),
);
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 201);
token.cancel();
}
#[tokio::test]
async fn test_integration_single_consumer_round_trip() {
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/echo"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let send_fut = client
.post(format!("http://127.0.0.1:{port}/echo"))
.header("Content-Type", "text/plain")
.body("ping")
.send();
let (http_result, _) = tokio::join!(send_fut, async {
if let Some(mut envelope) = rx.recv().await {
assert_eq!(
envelope.exchange.input.header("CamelHttpMethod"),
Some(&serde_json::Value::String("POST".into()))
);
assert_eq!(
envelope.exchange.input.header("CamelHttpPath"),
Some(&serde_json::Value::String("/echo".into()))
);
envelope.exchange.input.body = camel_component_api::Body::Text("pong".to_string());
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 200);
let body = resp.text().await.unwrap();
assert_eq!(body, "pong");
token.cancel();
}
#[tokio::test]
async fn test_integration_two_consumers_shared_port() {
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint_a = component
.create_endpoint(&format!("http://127.0.0.1:{port}/hello"))
.unwrap();
let mut consumer_a = endpoint_a.create_consumer().unwrap();
let endpoint_b = component
.create_endpoint(&format!("http://127.0.0.1:{port}/world"))
.unwrap();
let mut consumer_b = endpoint_b.create_consumer().unwrap();
let (tx_a, mut rx_a) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token_a = tokio_util::sync::CancellationToken::new();
let ctx_a = ConsumerContext::new(tx_a, token_a.clone());
let (tx_b, mut rx_b) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token_b = tokio_util::sync::CancellationToken::new();
let ctx_b = ConsumerContext::new(tx_b, token_b.clone());
tokio::spawn(async move { consumer_a.start(ctx_a).await.unwrap() });
tokio::spawn(async move { consumer_b.start(ctx_b).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let fut_hello = client.get(format!("http://127.0.0.1:{port}/hello")).send();
let (resp_hello, _) = tokio::join!(fut_hello, async {
if let Some(mut envelope) = rx_a.recv().await {
envelope.exchange.input.body =
camel_component_api::Body::Text("hello-response".to_string());
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let fut_world = client.get(format!("http://127.0.0.1:{port}/world")).send();
let (resp_world, _) = tokio::join!(fut_world, async {
if let Some(mut envelope) = rx_b.recv().await {
envelope.exchange.input.body =
camel_component_api::Body::Text("world-response".to_string());
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let body_a = resp_hello.unwrap().text().await.unwrap();
let body_b = resp_world.unwrap().text().await.unwrap();
assert_eq!(body_a, "hello-response");
assert_eq!(body_b, "world-response");
token_a.cancel();
token_b.cancel();
}
#[tokio::test]
async fn test_integration_unregistered_path_returns_404() {
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/registered"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, _rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let resp = client
.get(format!("http://127.0.0.1:{port}/not-there"))
.send()
.await
.unwrap();
assert_eq!(resp.status().as_u16(), 404);
token.cancel();
}
#[test]
fn test_http_consumer_declares_concurrent() {
use camel_component_api::ConcurrencyModel;
let config = HttpServerConfig {
host: "127.0.0.1".to_string(),
port: 19999,
path: "/test".to_string(),
max_request_body: 2 * 1024 * 1024,
max_response_body: 10 * 1024 * 1024,
};
let consumer = HttpConsumer::new(config);
assert_eq!(
consumer.concurrency_model(),
ConcurrencyModel::Concurrent { max: None }
);
}
#[tokio::test]
async fn test_http_reply_body_stream_variant_exists() {
use bytes::Bytes;
use camel_component_api::CamelError;
use futures::stream;
let chunks: Vec<Result<Bytes, CamelError>> =
vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
let stream = Box::pin(stream::iter(chunks));
let reply_body = HttpReplyBody::Stream(stream);
match reply_body {
HttpReplyBody::Stream(_) => {}
HttpReplyBody::Bytes(_) => panic!("expected Stream variant"),
}
}
#[cfg(feature = "otel")]
mod otel_tests {
use super::*;
use camel_component_api::Message;
use tower::ServiceExt;
#[tokio::test]
async fn test_producer_injects_traceparent_header() {
let (url, _handle) = start_test_server_with_header_capture().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::default());
let mut headers = std::collections::HashMap::new();
headers.insert(
"traceparent".to_string(),
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01".to_string(),
);
camel_otel::extract_into_exchange(&mut exchange, &headers);
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
let traceparent = result.input.header("x-received-traceparent");
assert!(
traceparent.is_some(),
"traceparent header should have been sent"
);
let traceparent_str = traceparent.unwrap().as_str().unwrap();
let parts: Vec<&str> = traceparent_str.split('-').collect();
assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
assert_eq!(parts[0], "00", "version should be 00");
assert_eq!(
parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
"trace-id should match"
);
assert_eq!(parts[2], "00f067aa0ba902b7", "span-id should match");
assert_eq!(parts[3], "01", "flags should be 01 (sampled)");
}
#[tokio::test]
async fn test_consumer_extracts_traceparent_header() {
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let send_fut = client
.post(format!("http://127.0.0.1:{port}/trace"))
.header(
"traceparent",
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
)
.body("test")
.send();
let (http_result, _) = tokio::join!(send_fut, async {
if let Some(envelope) = rx.recv().await {
let mut injected_headers = std::collections::HashMap::new();
camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
assert!(
injected_headers.contains_key("traceparent"),
"Exchange should have traceparent after extraction"
);
let traceparent = injected_headers.get("traceparent").unwrap();
let parts: Vec<&str> = traceparent.split('-').collect();
assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
assert_eq!(
parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
"Trace ID should match the original traceparent header"
);
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 200);
token.cancel();
}
#[tokio::test]
async fn test_consumer_extracts_mixed_case_traceparent_header() {
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/trace"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let send_fut = client
.post(format!("http://127.0.0.1:{port}/trace"))
.header(
"TraceParent",
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
)
.body("test")
.send();
let (http_result, _) = tokio::join!(send_fut, async {
if let Some(envelope) = rx.recv().await {
let mut injected_headers = HashMap::new();
camel_otel::inject_from_exchange(&envelope.exchange, &mut injected_headers);
assert!(
injected_headers.contains_key("traceparent"),
"Exchange should have traceparent after extraction from mixed-case header"
);
let traceparent = injected_headers.get("traceparent").unwrap();
let parts: Vec<&str> = traceparent.split('-').collect();
assert_eq!(parts.len(), 4, "traceparent should have 4 parts");
assert_eq!(
parts[1], "4bf92f3577b34da6a3ce929d0e0e4736",
"Trace ID should match the original mixed-case TraceParent header"
);
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 200);
token.cancel();
}
#[tokio::test]
async fn test_producer_no_trace_context_no_crash() {
let (url, _handle) = start_test_server().await;
let ctx = test_producer_ctx();
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("{url}/api?allowPrivateIps=true"))
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let exchange = Exchange::new(Message::default());
let result = producer.oneshot(exchange).await.unwrap();
let status = result
.input
.header("CamelHttpResponseCode")
.and_then(|v| v.as_u64())
.unwrap();
assert_eq!(status, 200);
}
async fn start_test_server_with_header_capture() -> (String, tokio::task::JoinHandle<()>) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let url = format!("http://127.0.0.1:{}", addr.port());
let handle = tokio::spawn(async move {
loop {
if let Ok((mut stream, _)) = listener.accept().await {
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = vec![0u8; 8192];
let n = stream.read(&mut buf).await.unwrap_or(0);
let request = String::from_utf8_lossy(&buf[..n]).to_string();
let traceparent = request
.lines()
.find(|line| line.to_lowercase().starts_with("traceparent:"))
.map(|line| {
line.split(':')
.nth(1)
.map(|s| s.trim().to_string())
.unwrap_or_default()
})
.unwrap_or_default();
let body =
format!(r#"{{"echo":"ok","traceparent":"{}"}}"#, traceparent);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nX-Received-Traceparent: {}\r\n\r\n{}",
body.len(),
traceparent,
body
);
let _ = stream.write_all(response.as_bytes()).await;
});
}
}
});
(url, handle)
}
}
#[tokio::test]
async fn test_request_body_arrives_as_stream() {
use camel_component_api::Body;
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/upload"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let send_fut = client
.post(format!("http://127.0.0.1:{port}/upload"))
.body("hello streaming world")
.send();
let (http_result, _) = tokio::join!(send_fut, async {
if let Some(mut envelope) = rx.recv().await {
assert!(
matches!(envelope.exchange.input.body, Body::Stream(_)),
"expected Body::Stream, got discriminant {:?}",
std::mem::discriminant(&envelope.exchange.input.body)
);
let bytes = envelope
.exchange
.input
.body
.into_bytes(1024 * 1024)
.await
.unwrap();
assert_eq!(&bytes[..], b"hello streaming world");
envelope.exchange.input.body = camel_component_api::Body::Empty;
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 200);
token.cancel();
}
#[tokio::test]
async fn test_streaming_response_chunked() {
use bytes::Bytes;
use camel_component_api::Body;
use camel_component_api::CamelError;
use camel_component_api::{ConsumerContext, ExchangeEnvelope};
use camel_component_api::{StreamBody, StreamMetadata};
use futures::stream;
use std::sync::Arc;
use tokio::sync::Mutex;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/stream"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let send_fut = client.get(format!("http://127.0.0.1:{port}/stream")).send();
let (http_result, _) = tokio::join!(send_fut, async {
if let Some(mut envelope) = rx.recv().await {
let chunks: Vec<Result<Bytes, CamelError>> =
vec![Ok(Bytes::from("chunk1")), Ok(Bytes::from("chunk2"))];
let stream = Box::pin(stream::iter(chunks));
envelope.exchange.input.body = Body::Stream(StreamBody {
stream: Arc::new(Mutex::new(Some(stream))),
metadata: StreamMetadata::default(),
});
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
});
let resp = http_result.unwrap();
assert_eq!(resp.status().as_u16(), 200);
let body = resp.text().await.unwrap();
assert_eq!(body, "chunk1chunk2");
token.cancel();
}
#[tokio::test]
async fn test_413_when_content_length_exceeds_limit() {
use camel_component_api::ConsumerContext;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!(
"http://127.0.0.1:{port}/upload?maxRequestBody=100"
))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, _rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{port}/upload"))
.header("Content-Length", "1000") .body("x".repeat(1000))
.send()
.await
.unwrap();
assert_eq!(resp.status().as_u16(), 413);
token.cancel();
}
#[tokio::test]
async fn test_chunked_upload_without_content_length_bypasses_limit() {
use bytes::Bytes;
use camel_component_api::Body;
use camel_component_api::ConsumerContext;
use futures::stream;
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
drop(listener);
let component = HttpComponent::new();
let endpoint = component
.create_endpoint(&format!("http://127.0.0.1:{port}/upload?maxRequestBody=10"))
.unwrap();
let mut consumer = endpoint.create_consumer().unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel::<camel_component_api::ExchangeEnvelope>(16);
let token = tokio_util::sync::CancellationToken::new();
let ctx = ConsumerContext::new(tx, token.clone());
tokio::spawn(async move { consumer.start(ctx).await.unwrap() });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let chunks: Vec<Result<Bytes, std::io::Error>> = vec![
Ok(Bytes::from("y".repeat(50))),
Ok(Bytes::from("y".repeat(50))),
];
let stream_body = reqwest::Body::wrap_stream(stream::iter(chunks));
let send_fut = client
.post(format!("http://127.0.0.1:{port}/upload"))
.body(stream_body)
.send();
let consumer_fut = async {
match tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv()).await {
Ok(Some(mut envelope)) => {
assert!(
matches!(envelope.exchange.input.body, Body::Stream(_)),
"expected Body::Stream"
);
envelope.exchange.input.body = camel_component_api::Body::Empty;
if let Some(reply_tx) = envelope.reply_tx {
let _ = reply_tx.send(Ok(envelope.exchange));
}
}
Ok(None) => panic!("consumer channel closed unexpectedly"),
Err(_) => {
}
}
};
let (http_result, _) = tokio::join!(send_fut, consumer_fut);
let resp = http_result.unwrap();
assert_ne!(
resp.status().as_u16(),
413,
"chunked upload must not be rejected by maxRequestBody"
);
assert_eq!(resp.status().as_u16(), 200);
token.cancel();
}
}