camel-processor
Message processors for rust-camel (EIP implementations)
Overview
camel-processor provides implementations of Enterprise Integration Patterns (EIP) as processors that transform, filter, and route messages in rust-camel pipelines. These are the building blocks you use when defining routes.
Features
- Filter: Conditional message routing based on predicates
- Choice: Content-based routing with
when/otherwisebranches - Log: Logging processor for debugging and monitoring
- SetHeader / DynamicSetHeader: Set message headers
- SetBody / MapBody: Transform message bodies
- Splitter: Split messages into multiple fragments
- Aggregator: Aggregate multiple messages into one
- Multicast: Send to multiple destinations in parallel
- RecipientList: Dynamically resolve endpoints from expression at runtime
- Dynamic Router: Route to endpoints determined at runtime by a closure; includes same-destination loop detection
- WireTap: Fire-and-forget message copying
- Circuit Breaker: Resilience pattern implementation
- Error Handler: Centralized error handling
- Stop: Stop processing immediately
- Script: Execute mutating expressions via
ScriptMutator; changes to headers, properties, and body propagate back with atomic rollback on error - Delayer: Delay message processing with fixed or dynamic (header-based) duration
- Stream Handling: Processors that consume streams replace the body with a JSON placeholder
{"placeholder": true} - Security Policy Layer: Tower middleware that enforces authorization before forwarding to the inner service
- Marshal / Unmarshal: Serialize/deserialize message bodies using pluggable data formats (JSON, XML)
Installation
Add to your Cargo.toml:
[]
= "*"
Usage
Using Processors Directly
use ;
use ;
// Filter exchanges
let filter = new;
// Log messages
let logger = new;
// Set a header
let set_header = new;
// Map body
let upper = new;
// Delay with fixed duration
let delayer = new;
// Delay with dynamic header
let dynamic_delayer = new;
With RouteBuilder (Recommended)
use RouteBuilder;
let route = from
.set_header
.filter
.log
.map_body
.end_filter
.to
.build
.unwrap;
Available Processors
| Processor | Purpose |
|---|---|
FilterService |
Route based on conditions |
ChoiceService |
Content-based routing (when/otherwise) |
LogProcessor |
Log exchange information |
SetHeader |
Set static header values |
DynamicSetHeader |
Set headers from expressions |
SetBody |
Replace message body |
MapBody |
Transform message body |
SplitterService |
Split messages |
AggregatorService |
Combine messages |
MulticastService |
Parallel routing |
WireTapService |
Side-channel routing |
CircuitBreakerLayer |
Fault tolerance |
ErrorHandlerLayer |
Error handling |
StopService |
Stop processing |
ScriptMutator |
Execute mutating scripts that modify Exchange headers, properties, or body |
MarshalService |
Marshal body using a DataFormat (e.g., Json → Text) |
UnmarshalService |
Unmarshal body using a DataFormat (e.g., Text → Json) |
DelayerService |
Delay message processing by a fixed or dynamic duration |
RecipientListService |
Dynamic recipient list — resolve endpoints from expression at runtime |
DynamicRouterService |
Dynamic router — resolve destination at runtime via closure; detects same-destination loops |
SecurityPolicyLayer |
Tower Layer that wraps a SecurityPolicy for authorization checks |
SecurityPolicyService |
Tower Service produced by SecurityPolicyLayer; evaluates the policy per exchange |
Security Policy Layer
SecurityPolicyLayer is a Tower Layer that intercepts every Exchange and evaluates a SecurityPolicy before forwarding to the inner service. It is the standard way to add authorization to any Camel pipeline.
How it works
The layer produces a SecurityPolicyService that calls policy.evaluate(&mut exchange) for each exchange and branches on the AuthorizationDecision:
| Decision | Behavior |
|---|---|
Granted { principal } |
Stores principal properties on the exchange (subject, issuer, audience, scopes, roles, claims) and forwards to the inner service |
Denied { reason, required, actual } |
Returns CamelError::Unauthorized with a message containing reason, required, and actual roles/scopes |
Err(e) |
Propagates the error (e.g., CamelError::Unauthenticated) |
Usage
use Arc;
use SecurityPolicyLayer;
use SecurityPolicy;
// Suppose you have a SecurityPolicy implementation:
let policy: = new;
// Wrap it as a Tower layer:
let layer = new;
// Compose in a Tower pipeline:
let service = layer.layer;
When composed with other processors, the security check runs before the inner service sees the exchange:
use ServiceBuilder;
use ;
let service = new
.layer
.service;
Exchanges that are denied or encounter an evaluation error never reach the inner service.
Data Formats
The DataFormat trait defines serialization/deserialization for message bodies:
| Format | Marshal (structured → wire) | Unmarshal (wire → structured) |
|---|---|---|
json |
Body::Json → Body::Text |
Body::Text/Body::Bytes → Body::Json |
xml |
Body::Json → Body::Text |
Body::Text/Body::Bytes/Body::Xml → Body::Json |
Usage with RouteBuilder
use RouteBuilder;
let route = from
.unmarshal
.marshal
.to
.build
.unwrap;
YAML DSL
steps:
- unmarshal: json
- marshal: xml
Documentation
License
Apache-2.0
Contributing
Contributions are welcome! Please see the main repository for details.