# camel-processor
> Message processors for rust-camel (EIP implementations)
## Overview
`camel-processor` provides implementations of Enterprise Integration Patterns (EIP) as processors that transform, filter, and route messages in rust-camel pipelines. These are the building blocks you use when defining routes.
## Features
- **Filter**: Conditional message routing based on predicates
- **Choice**: Content-based routing with `when`/`otherwise` branches
- **Log**: Logging processor for debugging and monitoring
- **SetHeader / DynamicSetHeader**: Set message headers
- **SetBody / MapBody**: Transform message bodies
- **Splitter**: Split messages into multiple fragments
- **Aggregator**: Aggregate multiple messages into one
- **Multicast**: Send to multiple destinations in parallel
- **RecipientList**: Dynamically resolve endpoints from expression at runtime
- **Dynamic Router**: Route to endpoints determined at runtime by a closure; includes same-destination loop detection
- **WireTap**: Fire-and-forget message copying
- **Circuit Breaker**: Resilience pattern implementation
- **Error Handler**: Centralized error handling
- **Stop**: Stop processing immediately
- **Script**: Execute mutating expressions via `ScriptMutator`; changes to headers, properties, and body propagate back with atomic rollback on error
- **Delayer**: Delay message processing with fixed or dynamic (header-based) duration
- **Stream Handling**: Processors that consume streams replace the body with a JSON placeholder `{"placeholder": true}`
- **Security Policy Layer**: Tower middleware that enforces authorization before forwarding to the inner service
- **Marshal / Unmarshal**: Serialize/deserialize message bodies using pluggable data formats (JSON, XML)
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
camel-processor = "*"
```
## Usage
### Using Processors Directly
```rust
use camel_processor::{
FilterService, LogProcessor, LogLevel, SetHeader, SetBody,
MapBody, SplitterService, MulticastService, DelayerService
};
use camel_api::{Exchange, Message, Body, Value, BoxProcessor, DelayConfig};
// Filter exchanges
let filter = FilterService::new(
|ex: &Exchange| ex.input.body.as_text().map(|t| t.len() > 5).unwrap_or(false),
my_processor
);
// Log messages
let logger = LogProcessor::new(LogLevel::Info, "Processing message".to_string());
// Set a header
let set_header = SetHeader::new(identity, "source", Value::String("api".into()));
// Map body
});
// Delay with fixed duration
let delayer = DelayerService::new(DelayConfig::new(500));
// Delay with dynamic header
let dynamic_delayer = DelayerService::new(
DelayConfig::new(1000).with_dynamic_header("CamelDelayMs")
);
```
### With RouteBuilder (Recommended)
```rust
use camel_builder::RouteBuilder;
let route = RouteBuilder::from("timer:tick")
.set_header("processed", Value::Bool(true))
.filter(|ex| ex.input.body.as_text().is_some())
.log("Processing text message", LogLevel::Info)
.map_body(|b| Body::Text(b.as_text().unwrap_or("").to_uppercase()))
.end_filter()
.to("mock:result")
.build()
.unwrap();
```
## Available Processors
| `FilterService` | Route based on conditions |
| `ChoiceService` | Content-based routing (when/otherwise) |
| `LogProcessor` | Log exchange information |
| `SetHeader` | Set static header values |
| `DynamicSetHeader` | Set headers from expressions |
| `SetBody` | Replace message body |
| `MapBody` | Transform message body |
| `SplitterService` | Split messages |
| `AggregatorService` | Combine messages |
| `MulticastService` | Parallel routing |
| `WireTapService` | Side-channel routing |
| `CircuitBreakerLayer` | Fault tolerance |
| `ErrorHandlerLayer` | Error handling |
| `StopService` | Stop processing |
| `ScriptMutator` | Execute mutating scripts that modify Exchange headers, properties, or body |
| `MarshalService` | Marshal body using a DataFormat (e.g., Json → Text) |
| `UnmarshalService` | Unmarshal body using a DataFormat (e.g., Text → Json) |
| `DelayerService` | Delay message processing by a fixed or dynamic duration |
| `RecipientListService` | Dynamic recipient list — resolve endpoints from expression at runtime |
| `DynamicRouterService` | Dynamic router — resolve destination at runtime via closure; detects same-destination loops |
| `SecurityPolicyLayer` | Tower Layer that wraps a `SecurityPolicy` for authorization checks |
| `SecurityPolicyService` | Tower Service produced by `SecurityPolicyLayer`; evaluates the policy per exchange |
## Security Policy Layer
`SecurityPolicyLayer` is a Tower `Layer` that intercepts every `Exchange` and evaluates a `SecurityPolicy` before forwarding to the inner service. It is the standard way to add authorization to any Camel pipeline.
### How it works
The layer produces a `SecurityPolicyService` that calls `policy.evaluate(&mut exchange)` for each exchange and branches on the `AuthorizationDecision`:
| `Granted { principal }` | Stores principal properties on the exchange (subject, issuer, audience, scopes, roles, claims) and forwards to the inner service |
| `Denied { reason, required, actual }` | Returns `CamelError::Unauthorized` with a message containing reason, required, and actual roles/scopes |
| `Err(e)` | Propagates the error (e.g., `CamelError::Unauthenticated`) |
### Usage
```rust
use std::sync::Arc;
use camel_processor::SecurityPolicyLayer;
use camel_api::security_policy::SecurityPolicy;
// Suppose you have a SecurityPolicy implementation:
let policy: Arc<dyn SecurityPolicy> = Arc::new(MyPolicy);
// Wrap it as a Tower layer:
let layer = SecurityPolicyLayer::new(policy);
// Compose in a Tower pipeline:
let service = layer.layer(my_inner_service);
```
When composed with other processors, the security check runs before the inner service sees the exchange:
```rust
use tower::ServiceBuilder;
use camel_processor::{SecurityPolicyLayer, LogProcessor};
let service = ServiceBuilder::new()
.layer(SecurityPolicyLayer::new(my_policy))
.service(LogProcessor::new(LogLevel::Info, "Authorized request".into()));
```
Exchanges that are denied or encounter an evaluation error never reach the inner service.
## Data Formats
The `DataFormat` trait defines serialization/deserialization for message bodies:
| `json` | `Body::Json` → `Body::Text` | `Body::Text`/`Body::Bytes` → `Body::Json` |
| `xml` | `Body::Json` → `Body::Text` | `Body::Text`/`Body::Bytes`/`Body::Xml` → `Body::Json` |
### Usage with RouteBuilder
```rust
use camel_builder::RouteBuilder;
let route = RouteBuilder::from("direct:in")
.unmarshal("json")
.marshal("xml")
.to("mock:out")
.build()
.unwrap();
```
### YAML DSL
```yaml
steps:
- unmarshal: json
- marshal: xml
```
## Documentation
- [API Documentation](https://docs.rs/camel-processor)
- [Repository](https://github.com/kennycallado/rust-camel)
## License
Apache-2.0
## Contributing
Contributions are welcome! Please see the [main repository](https://github.com/kennycallado/rust-camel) for details.