rust-camel
A Rust integration framework inspired by Apache Camel, built on Tower.
Status: Pre-release (
0.1.0). APIs will change.
Overview
rust-camel lets you define message routes between components using a fluent builder API. The data plane (exchange processing, EIP patterns, middleware) is Tower-native — every processor and producer is a Service<Exchange>. The control plane (components, endpoints, consumers, lifecycle) uses its own trait hierarchy.
Current components: timer, log, direct, mock, file.
Quick Example
use ;
use RouteBuilder;
use CamelContext;
use LogComponent;
use TimerComponent;
async
Crate Map
| Crate | Description |
|---|---|
camel-api |
Core types: Exchange, Message, CamelError, BoxProcessor, ProcessorFn |
camel-core |
Runtime: CamelContext, Route, RouteDefinition, SequentialPipeline |
camel-config |
Configuration: CamelConfig, route discovery from YAML files with glob patterns |
camel-builder |
Fluent RouteBuilder API |
camel-component |
Component, Endpoint, Consumer traits |
camel-processor |
EIP processors: Filter, Choice, Splitter, Aggregator, WireTap, Multicast, SetHeader, MapBody + Tower Layer types |
camel-endpoint |
Endpoint URI parsing utilities |
camel-timer |
Timer source component |
camel-log |
Log sink component |
camel-direct |
In-memory synchronous component |
camel-mock |
Test component with assertions on received exchanges |
camel-test |
Integration test harness |
camel-controlbus |
Control routes dynamically from within routes |
camel-language-api |
Language trait API: Language, Expression, Predicate |
camel-language-simple |
Simple Language: ${header.x}, ${body}, operators |
camel-language-rhai |
Rhai scripting language for full expression power |
camel-bean |
Bean/Registry system for dependency injection and business logic integration |
camel-prometheus |
Prometheus metrics exporter with /metrics endpoint |
Building & Testing
Implemented EIP Patterns
| Pattern | Builder Method | Description |
|---|---|---|
| Filter | .filter(predicate) |
Forward exchange only when predicate is true |
| Splitter | .split(config) |
Split one exchange into multiple fragments |
| Aggregator | .aggregate(config) |
Correlate and aggregate multiple exchanges |
| WireTap | .wire_tap(uri) |
Fire-and-forget copy to a tap endpoint |
| Multicast | .multicast() |
Send the same exchange to multiple endpoints |
| Content-Based Router | .choice() / .when() |
Route based on exchange content |
Run an example:
Security Features
rust-camel includes production-ready security features:
SSRF Protection (HTTP Component)
// Block private IPs by default
from
.to
.build?
// Custom blocked hosts
from
.to
.build?
Path Traversal Protection (File Component)
All file operations validate that resolved paths remain within the configured base directory. Attempts to use ../ or absolute paths outside base are rejected.
Timeouts
All I/O operations have configurable timeouts:
- File:
readTimeout,writeTimeout(default: 30s) - HTTP:
connectTimeout,responseTimeout
Memory Limits
Aggregator supports max_buckets and bucket_ttl to prevent memory leaks.
Observability
Correlation IDs
Every exchange has a unique correlation_id for distributed tracing.
Metrics
Implement MetricsCollector trait to integrate with Prometheus, OpenTelemetry, etc.
Prometheus Metrics
Export metrics to Prometheus with automatic lifecycle management:
use PrometheusService;
let ctx = new
.with_lifecycle
.with_tracing;
ctx.start.await?;
// Prometheus server starts automatically
Available metrics:
camel_exchanges_total{route}- Total exchanges processedcamel_errors_total{route, error_type}- Total errorscamel_exchange_duration_seconds{route}- Exchange processing duration (histogram)camel_queue_depth{route}- Current queue depthcamel_circuit_breaker_state{route}- Circuit breaker state
Architecture: PrometheusService implements Lifecycle trait (following Apache Camel's Service pattern, adapted to avoid tower::Service confusion).
Health Monitoring
Built-in health endpoints for Kubernetes:
/healthz- Liveness probe/readyz- Readiness probe/health- Detailed health report
livenessProbe:
httpGet:
path: /healthz
port: 9090
Route Lifecycle Management
rust-camel supports controlling when and how routes start:
Auto Startup
By default, all routes start automatically when ctx.start() is called. You can disable this:
let route = from
.route_id
.auto_startup // Won't start automatically
.to
.build?;
Startup Order
Control the order in which routes start (useful for dependencies):
let route_a = from
.route_id
.startup_order // Starts first
.to
.build?;
let route_b = from
.route_id
.startup_order // Starts after route-a
.to
.build?;
Runtime Control
Control routes dynamically from code or from other routes:
// From code:
ctx.route_controller.lock.await.start_route.await?;
ctx.route_controller.lock.await.stop_route.await?;
// From another route (using controlbus):
from
.set_header
.to
.build?
See examples/lazy-route for a complete example.
Error Handling
rust-camel provides sophisticated error handling with retry policies and dead letter channels.
RedeliveryPolicy with Jitter
Configure retry behavior with exponential backoff and jitter:
use ;
use Duration;
let error_handler = dead_letter_channel
.on_exception
.retry // Max 3 retry attempts
.with_backoff
.with_jitter // ±20% randomization (recommended: 0.1-0.3)
.build;
Jitter Benefits:
- Prevents thundering herd in distributed systems
- Recommended values: 0.1-0.3 (10-30%)
- Adds randomization:
delay ± (delay * jitter_factor)
Camel-Compatible Headers
During retries, these headers are automatically set:
CamelRedelivered-truewhen exchange is being retriedCamelRedeliveryCounter- Current retry attempt (1-indexed)CamelRedeliveryMaxCounter- Maximum retry attempts
YAML Configuration
routes:
- id: "retry-example"
from: "timer:tick"
error_handler:
dead_letter_uri: "log:dlc"
retry:
max_attempts: 3
initial_delay_ms: 100
multiplier: 2.0
max_delay_ms: 10000
jitter_factor: 0.2
steps:
- to: "direct:processor"
See examples/error-handling for complete examples.
Configuration
rust-camel supports external configuration via Camel.toml files using the camel-config crate:
Configuration File
Create a Camel.toml file:
[]
= ["routes/**/*.yaml"]
= "INFO"
[]
= "ERROR"
Loading Configuration
use CamelConfig;
use CamelContext;
use TimerComponent;
use LogComponent;
// Load configuration
let config = from_file?;
// Create context and register components
let mut ctx = new;
ctx.register_component;
ctx.register_component;
// Load routes from discovered files
let routes = config.load_routes?;
for route in routes
ctx.start.await?;
Route Files
Create YAML route files:
# routes/hello.yaml
routes:
- id: "hello-timer"
from: "timer:tick?period=1000"
steps:
- to: "log:info"
Environment Variables
Override configuration with environment variables:
# Select profile
# Override specific values
Features
- Profile support: Multiple environments in one file
- Route discovery: Auto-load routes from glob patterns
- Environment overrides: Override any value with
CAMEL_*prefix - Deep merging: Nested configs merge properly
See docs/configuration.md for full details.
License
Apache-2.0