camel-processor
Message processors for rust-camel (EIP implementations)
Overview
camel-processor provides implementations of Enterprise Integration Patterns (EIP) as processors that transform, filter, and route messages in rust-camel pipelines. These are the building blocks you use when defining routes.
Features
- Filter: Conditional message routing based on predicates
- Choice: Content-based routing with
when/otherwisebranches - Log: Logging processor for debugging and monitoring
- SetHeader / DynamicSetHeader: Set message headers
- SetBody / MapBody: Transform message bodies
- Splitter: Split messages into multiple fragments
- Aggregator: Aggregate multiple messages into one
- Multicast: Send to multiple destinations in parallel
- RecipientList: Dynamically resolve endpoints from expression at runtime
- WireTap: Fire-and-forget message copying
- Circuit Breaker: Resilience pattern implementation
- Error Handler: Centralized error handling
- Stop: Stop processing immediately
- Script: Execute mutating expressions via
ScriptMutator; changes to headers, properties, and body propagate back with atomic rollback on error - Delayer: Delay message processing with fixed or dynamic (header-based) duration
- Stream Handling: Processors that consume streams replace the body with a JSON placeholder
{"placeholder": true} - Marshal / Unmarshal: Serialize/deserialize message bodies using pluggable data formats (JSON, XML)
Installation
Add to your Cargo.toml:
[]
= "0.2"
Usage
Using Processors Directly
use ;
use ;
// Filter exchanges
let filter = new;
// Log messages
let logger = new;
// Set a header
let set_header = new;
// Map body
let upper = new;
// Delay with fixed duration
let delayer = new;
// Delay with dynamic header
let dynamic_delayer = new;
With RouteBuilder (Recommended)
use RouteBuilder;
let route = from
.set_header
.filter
.log
.map_body
.end_filter
.to
.build
.unwrap;
Available Processors
| Processor | Purpose |
|---|---|
FilterService |
Route based on conditions |
ChoiceService |
Content-based routing (when/otherwise) |
LogProcessor |
Log exchange information |
SetHeader |
Set static header values |
DynamicSetHeader |
Set headers from expressions |
SetBody |
Replace message body |
MapBody |
Transform message body |
SplitterService |
Split messages |
AggregatorService |
Combine messages |
MulticastService |
Parallel routing |
WireTapService |
Side-channel routing |
CircuitBreakerLayer |
Fault tolerance |
ErrorHandlerLayer |
Error handling |
StopService |
Stop processing |
ScriptMutator |
Execute mutating scripts that modify Exchange headers, properties, or body |
MarshalService |
Marshal body using a DataFormat (e.g., Json → Text) |
UnmarshalService |
Unmarshal body using a DataFormat (e.g., Text → Json) |
DelayerService |
Delay message processing by a fixed or dynamic duration |
RecipientListService |
Dynamic recipient list — resolve endpoints from expression at runtime |
Data Formats
The DataFormat trait defines serialization/deserialization for message bodies:
| Format | Marshal (structured → wire) | Unmarshal (wire → structured) |
|---|---|---|
json |
Body::Json → Body::Text |
Body::Text/Body::Bytes → Body::Json |
xml |
Body::Xml → Body::Text |
Body::Text/Body::Bytes → Body::Xml |
Usage with RouteBuilder
use RouteBuilder;
let route = from
.unmarshal
.marshal
.to
.build
.unwrap;
YAML DSL
steps:
- unmarshal: json
- marshal: xml
Documentation
License
Apache-2.0
Contributing
Contributions are welcome! Please see the main repository for details.