camel-processor 0.13.0

Message processors for rust-camel
Documentation

camel-processor

Message processors for rust-camel (EIP implementations)

Overview

camel-processor provides implementations of Enterprise Integration Patterns (EIP) as processors that transform, filter, and route messages in rust-camel pipelines. These are the building blocks you use when defining routes.

Features

  • Filter: Conditional message routing based on predicates
  • Choice: Content-based routing with when/otherwise branches
  • Log: Logging processor for debugging and monitoring
  • SetHeader / DynamicSetHeader: Set message headers
  • SetBody / MapBody: Transform message bodies
  • Splitter: Split messages into multiple fragments
  • Aggregator: Aggregate multiple messages into one
  • Multicast: Send to multiple destinations in parallel
  • RecipientList: Dynamically resolve endpoints from expression at runtime
  • Dynamic Router: Route to endpoints determined at runtime by a closure; includes same-destination loop detection
  • WireTap: Fire-and-forget message copying
  • Circuit Breaker: Resilience pattern implementation
  • Error Handler: Centralized error handling
  • Stop: Stop processing immediately
  • Script: Execute mutating expressions via ScriptMutator; changes to headers, properties, and body propagate back with atomic rollback on error
  • Delayer: Delay message processing with fixed or dynamic (header-based) duration
  • Stream Handling: Processors that consume streams replace the body with a JSON placeholder {"placeholder": true}
  • Security Policy Layer: Tower middleware that enforces authorization before forwarding to the inner service
  • Marshal / Unmarshal: Serialize/deserialize message bodies using pluggable data formats (JSON, XML)

Installation

Add to your Cargo.toml:

[dependencies]
camel-processor = "*"

Usage

Using Processors Directly

use camel_processor::{
    FilterService, LogProcessor, LogLevel, SetHeader, SetBody,
    MapBody, SplitterService, MulticastService, DelayerService
};
use camel_api::{Exchange, Message, Body, Value, BoxProcessor, DelayConfig};

// Filter exchanges
let filter = FilterService::new(
    |ex: &Exchange| ex.input.body.as_text().map(|t| t.len() > 5).unwrap_or(false),
    my_processor
);

// Log messages
let logger = LogProcessor::new(LogLevel::Info, "Processing message".to_string());

// Set a header
let set_header = SetHeader::new(identity, "source", Value::String("api".into()));

// Map body
let upper = MapBody::new(identity, |body: Body| {
    body.as_text().map(|t| Body::Text(t.to_uppercase())).unwrap_or(body)
});

// Delay with fixed duration
let delayer = DelayerService::new(DelayConfig::new(500));

// Delay with dynamic header
let dynamic_delayer = DelayerService::new(
    DelayConfig::new(1000).with_dynamic_header("CamelDelayMs")
);

With RouteBuilder (Recommended)

use camel_builder::RouteBuilder;

let route = RouteBuilder::from("timer:tick")
    .set_header("processed", Value::Bool(true))
    .filter(|ex| ex.input.body.as_text().is_some())
        .log("Processing text message", LogLevel::Info)
        .map_body(|b| Body::Text(b.as_text().unwrap_or("").to_uppercase()))
    .end_filter()
    .to("mock:result")
    .build()
    .unwrap();

Available Processors

Processor Purpose
FilterService Route based on conditions
ChoiceService Content-based routing (when/otherwise)
LogProcessor Log exchange information
SetHeader Set static header values
DynamicSetHeader Set headers from expressions
SetBody Replace message body
MapBody Transform message body
SplitterService Split messages
AggregatorService Combine messages
MulticastService Parallel routing
WireTapService Side-channel routing
CircuitBreakerLayer Fault tolerance
ErrorHandlerLayer Error handling
StopService Stop processing
ScriptMutator Execute mutating scripts that modify Exchange headers, properties, or body
MarshalService Marshal body using a DataFormat (e.g., Json → Text)
UnmarshalService Unmarshal body using a DataFormat (e.g., Text → Json)
DelayerService Delay message processing by a fixed or dynamic duration
RecipientListService Dynamic recipient list — resolve endpoints from expression at runtime
DynamicRouterService Dynamic router — resolve destination at runtime via closure; detects same-destination loops
SecurityPolicyLayer Tower Layer that wraps a SecurityPolicy for authorization checks
SecurityPolicyService Tower Service produced by SecurityPolicyLayer; evaluates the policy per exchange

Security Policy Layer

SecurityPolicyLayer is a Tower Layer that intercepts every Exchange and evaluates a SecurityPolicy before forwarding to the inner service. It is the standard way to add authorization to any Camel pipeline.

How it works

The layer produces a SecurityPolicyService that calls policy.evaluate(&mut exchange) for each exchange and branches on the AuthorizationDecision:

Decision Behavior
Granted { principal } Stores principal properties on the exchange (subject, issuer, audience, scopes, roles, claims) and forwards to the inner service
Denied { reason, required, actual } Returns CamelError::Unauthorized with a message containing reason, required, and actual roles/scopes
Err(e) Propagates the error (e.g., CamelError::Unauthenticated)

Usage

use std::sync::Arc;
use camel_processor::SecurityPolicyLayer;
use camel_api::security_policy::SecurityPolicy;

// Suppose you have a SecurityPolicy implementation:
let policy: Arc<dyn SecurityPolicy> = Arc::new(MyPolicy);

// Wrap it as a Tower layer:
let layer = SecurityPolicyLayer::new(policy);

// Compose in a Tower pipeline:
let service = layer.layer(my_inner_service);

When composed with other processors, the security check runs before the inner service sees the exchange:

use tower::ServiceBuilder;
use camel_processor::{SecurityPolicyLayer, LogProcessor};

let service = ServiceBuilder::new()
    .layer(SecurityPolicyLayer::new(my_policy))
    .service(LogProcessor::new(LogLevel::Info, "Authorized request".into()));

Exchanges that are denied or encounter an evaluation error never reach the inner service.

Data Formats

The DataFormat trait defines serialization/deserialization for message bodies:

Format Marshal (structured → wire) Unmarshal (wire → structured)
json Body::JsonBody::Text Body::Text/Body::BytesBody::Json
xml Body::JsonBody::Text Body::Text/Body::Bytes/Body::XmlBody::Json

Usage with RouteBuilder

use camel_builder::RouteBuilder;

let route = RouteBuilder::from("direct:in")
    .unmarshal("json")
    .marshal("xml")
    .to("mock:out")
    .build()
    .unwrap();

YAML DSL

steps:
  - unmarshal: json
  - marshal: xml

Documentation

License

Apache-2.0

Contributing

Contributions are welcome! Please see the main repository for details.