camel-component-http 0.5.0

HTTP client component for rust-camel
Documentation
# camel-component-http

> HTTP client and server component for rust-camel

## Overview

The HTTP component provides HTTP client (producer) and HTTP server (consumer) capabilities for rust-camel. Built on `reqwest` for clients and `axum` for servers, it enables REST API integration, webhook handling, and HTTP-based messaging.

## Features

- **HTTP Server (Consumer)**: Listen for incoming HTTP requests
- **HTTP Client (Producer)**: Make outgoing HTTP requests
- **HTTPS Support**: Secure connections with `https` scheme
- **Configurable Timeouts**: Connect and response timeouts
- **SSRF Protection**: Optional private IP blocking
- **Streaming**: Direct stream-to-HTTP piping without materialization
- **Native Request Streaming**: Incoming bodies arrive as `Body::Stream`, no RAM materialization
- **Native Response Streaming**: `Body::Stream` responses use chunked transfer encoding automatically
- **Header Mapping**: Automatic header forwarding
- **Status Code Handling**: Configurable success ranges

## Installation

Add to your `Cargo.toml`:

```toml
[dependencies]
camel-component-http = "0.2"
```

## URI Format

```
http://host:port/path[?options]
https://host:port/path[?options]
```

## Consumer Options (Server)

| Option | Default | Description |
|--------|---------|-------------|
| Host/path from URI | - | e.g., `http://0.0.0.0:8080/api` |
| `maxRequestBody` | `2097152` (2 MB) | If request `Content-Length` exceeds this value, responds 413 before opening the stream. Chunked uploads without `Content-Length` are not limited at the consumer level. |

## Producer Options (Client)

| Option | Default | Description |
|--------|---------|-------------|
| `httpMethod` | Auto | HTTP method (GET, POST, etc.) |
| `throwExceptionOnFailure` | `true` | Throw on non-2xx responses |
| `okStatusCodeRange` | `200-299` | Success status code range |
| `followRedirects` | `false` | Follow HTTP redirects |
| `connectTimeout` | `30000` | Connection timeout (ms) |
| `responseTimeout` | - | Response timeout (ms) |
| `allowPrivateIps` | `false` | Allow requests to private IPs |
| `blockedHosts` | - | Comma-separated blocked hosts |

## Usage

### HTTP Server (Consumer)

```rust
use camel_builder::RouteBuilder;
use camel_component_http::HttpComponent;
use camel_core::CamelContext;

let mut ctx = CamelContext::new();
ctx.register_component("http", Box::new(HttpComponent::new()));

// Simple API endpoint
let route = RouteBuilder::from("http://0.0.0.0:8080/hello")
    .process(|ex| async move {
        let mut ex = ex;
        ex.input.body = camel_api::Body::Text("Hello, World!".to_string());
        Ok(ex)
    })
    .build()?;
```

### HTTP Client (Producer)

```rust
// GET request
let route = RouteBuilder::from("timer:tick?period=60000")
    .to("http://api.example.com/data?allowPrivateIps=false")
    .log("Response received", camel_processor::LogLevel::Info)
    .build()?;

// POST request (body becomes request body)
let route = RouteBuilder::from("direct:submit")
    .to("http://api.example.com/submit?httpMethod=POST")
    .build()?;
```

### Dynamic HTTP Method

```rust
// Method from header
let route = RouteBuilder::from("direct:api")
    .set_header("CamelHttpMethod", Value::String("DELETE".into()))
    .to("http://api.example.com/resource")
    .build()?;
```

### Dynamic URL

```rust
// URL from header
let route = RouteBuilder::from("direct:proxy")
    .set_header("CamelHttpUri", Value::String("http://backend.service/api".into()))
    .to("http://localhost/dummy")  // Base URL, overridden by header
    .build()?;
```

### HTTPS

```rust
let route = RouteBuilder::from("timer:secure")
    .to("https://secure.api.com/endpoint")
    .build()?;
```

## Exchange Headers

### Request Headers (Consumer)

| Header | Description |
|--------|-------------|
| `CamelHttpMethod` | HTTP method (GET, POST, etc.) |
| `CamelHttpPath` | Request path |
| `CamelHttpQuery` | Query string |
| All HTTP headers | Forwarded from request |

### Response Headers (Producer)

| Header | Description |
|--------|-------------|
| `CamelHttpResponseCode` | HTTP status code |
| `CamelHttpResponseText` | Status text |
| All response headers | Forwarded from response |

### Request Control Headers (Producer)

| Header | Description |
|--------|-------------|
| `CamelHttpUri` | Override target URL |
| `CamelHttpPath` | Append to base URL path |
| `CamelHttpQuery` | Append query string |
| `CamelHttpMethod` | Override HTTP method |

## Example: REST API Server

```rust
use camel_builder::RouteBuilder;
use camel_component_http::{HttpComponent, HttpsComponent};
use camel_core::CamelContext;
use camel_api::Body;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ctx = CamelContext::new();
    ctx.register_component("http", Box::new(HttpComponent::new()));

    // GET /api/users
    ctx.add_route(
        RouteBuilder::from("http://0.0.0.0:8080/api/users")
            .process(|ex| async move {
                let mut ex = ex;
                ex.input.body = Body::Text(r#"[{"id":1,"name":"Alice"}]"#.to_string());
                ex.input.set_header("Content-Type", serde_json::Value::String("application/json".into()));
                Ok(ex)
            })
            .build()?
    ).await?;

    // POST /api/users
    ctx.add_route(
        RouteBuilder::from("http://0.0.0.0:8080/api/users")
            .filter(|ex| ex.input.header("CamelHttpMethod").and_then(|v| v.as_str()) == Some("POST"))
                .process(|ex| async move {
                    // Create user from request body
                    Ok(ex)
                })
                .set_body(Body::Text(r#"{"status":"created"}"#))
            .end_filter()
            .build()?
    ).await?;

    ctx.start().await?;
    tokio::signal::ctrl_c().await?;
    ctx.stop().await?;

    Ok(())
}
```

## Example: HTTP Client with Error Handling

```rust
let route = RouteBuilder::from("direct:api-call")
    .to("http://api.service.com/endpoint?throwExceptionOnFailure=true&connectTimeout=5000")
    .build()?;

// With custom error handling
let route = RouteBuilder::from("direct:resilient")
    .error_handler(ErrorHandlerConfig::log_only())
    .to("http://api.service.com/endpoint?throwExceptionOnFailure=false")
    .process(|ex| async move {
        let status = ex.input.header("CamelHttpResponseCode")
            .and_then(|v| v.as_u64())
            .unwrap_or(0);
        if status >= 400 {
            // Handle error response
        }
        Ok(ex)
    })
    .build()?;
```

## Global Configuration

Configure default HTTP settings in `Camel.toml` that apply to all HTTP endpoints:

```toml
[default.components.http]
connect_timeout_ms = 5000        # Connection timeout (default: 30000)
response_timeout_ms = 30000      # Response timeout (default: none)
max_connections = 100            # Max concurrent connections (default: 100)
max_body_size = 10485760         # Max response body size, 10MB (default: 10MB)
max_request_body = 2097152       # Max request body for server, 2MB (default: 2MB)
allow_private_ips = false        # Allow requests to private IPs (default: false)
```

URI parameters always override global defaults:

```rust
// Uses global connect_timeout_ms (5000) but overrides allowPrivateIps
.to("http://api.example.com?allowPrivateIps=true")

// Overrides both global settings
.to("http://api.example.com?connectTimeout=10000&allowPrivateIps=true")
```

### Profile-Specific Configuration

```toml
[default.components.http]
connect_timeout_ms = 30000

[production.components.http]
connect_timeout_ms = 5000   # Faster fail in production
allow_private_ips = false

[development.components.http]
connect_timeout_ms = 60000  # More lenient in dev
allow_private_ips = true    # Allow internal services in dev
```

## SSRF Protection

By default, the HTTP client blocks requests to private IP addresses for security. To allow:

```rust
.to("http://internal.service/api?allowPrivateIps=true")
```

To block specific hosts:

```rust
.to("http://api.example.com?blockedHosts=localhost,127.0.0.1,internal.local")
```

## Streaming & Memory Management

The HTTP component supports native streaming for both producer and consumer.

### Producer (Client) Streaming

The HTTP producer supports streaming request bodies directly without materializing them in memory. Stream bodies are piped to reqwest using `wrap_stream()`.

Memory limits apply when materialization is required (default: 10MB).

### Consumer (Server) Streaming

**Request Bodies:** Incoming HTTP request bodies arrive as `Body::Stream` in the Exchange, with no RAM materialization by default. The `Content-Length` header (if present) populates `StreamMetadata.size_hint`, and `Content-Type` populates `StreamMetadata.content_type`.

**413 Protection:** If the `Content-Length` header exceeds `maxRequestBody`, the server responds with HTTP 413 before opening the stream. Chunked uploads without a `Content-Length` header are not limited at the consumer level.

**Response Bodies:** 
- `Body::Stream` responses use `Transfer-Encoding: chunked` automatically (no buffering)
- `Body::Bytes` / `Body::Text` responses use standard `Content-Length`

### Streaming Response Example

```rust
// Streaming response example (server-sent data)
from("http://0.0.0.0:8080/stream")
    .process(|exchange| Box::pin(async move {
        let chunks = vec![
            Ok(Bytes::from("chunk1\n")),
            Ok(Bytes::from("chunk2\n")),
        ];
        let stream = Box::pin(futures::stream::iter(chunks));
        exchange.input.body = Body::Stream(StreamBody {
            stream: Arc::new(tokio::sync::Mutex::new(Some(stream))),
            metadata: StreamMetadata::default(),
        });
        Ok(())
    }))
    .build()
```

### Request Body Access

```rust
// Access request body as stream (default) or materialize it
from("http://0.0.0.0:8080/upload")
    .process(|exchange| Box::pin(async move {
        // Option A: keep as stream (zero-copy)
        // exchange.input.body is already Body::Stream
        
        // Option B: materialize when you need the bytes
        let bytes = exchange.input.body.into_bytes(10 * 1024 * 1024).await?;
        exchange.input.body = Body::Bytes(bytes);
        Ok(())
    }))
    .build()
```

## Documentation

- [API Documentation]https://docs.rs/camel-component-http
- [Repository]https://github.com/kennycallado/rust-camel

## License

Apache-2.0

## Contributing

Contributions are welcome! Please see the [main repository](https://github.com/kennycallado/rust-camel) for details.