# camel-dsl
>
> DSL support for rust-camel (YAML, JSON)
## Overview
`camel-dsl` provides Domain Specific Language support for defining routes in rust-camel declaratively. Routes can be defined in YAML or JSON files and loaded at runtime, enabling external configuration without recompiling the application.
This crate is useful when you want to:
- Externalize route configuration
- Define routes without writing Rust code
- Enable non-developers to modify routes via configuration files
## Features
- **YAML and JSON route definitions**: Define routes using YAML or JSON syntax
- **Declarative integration flows**: Use all available EIPs and DSL
- **External configuration**: Load routes from files at runtime
- **Language expressions**: Use `simple:` and `rhai:` syntax for dynamic values
- **Route-level configuration**: Auto-startup, startup ordering, concurrency, error handling, circuit breaker, unit-of-work hooks
- **Environment variable interpolation**: Inject env vars in route files using `${env:VAR_NAME}` syntax with optional defaults `${env:VAR_NAME:-default}`
- **All step types**: to, log, set_header, set_body, transform, filter, choice, split, aggregate, delay, wire_tap, multicast, recipient_list, stop, script, bean, throttle, load_balance, dynamic_router, routing_slip
## Supported YAML Steps
### Core Steps
to, log, set_header, set_body, transform, filter, choice, split, aggregate, delay, wire_tap, multicast, recipient_list, stop, script, bean
### Delay
Delay exchange processing:
```yaml
steps:
- delay: 500 # shorthand: 500ms fixed
- delay: # full form:
delay_ms: 1000
dynamic_header: CamelDelayMs
```
### Throttle
Rate-limit message processing:
```yaml
steps:
- throttle:
max_requests: 10
period_secs: 1
strategy: "delay"
steps:
- to: "mock:result"
```
### Load Balance
Distribute across endpoints:
```yaml
steps:
- load_balance:
strategy: "round_robin"
parallel: false
steps:
- to: "mock:a"
- to: "mock:b"
```
Weighted distribution (follows Apache Camel's `distributionRatio` pattern):
```yaml
steps:
- load_balance:
strategy: "weighted"
distribution_ratio: "4,2,1"
steps:
- to: "seda:x"
- to: "seda:y"
- to: "seda:z"
```
The `distribution_ratio` string maps weights to steps by position. `"4,2,1"` means for every 7 messages: 4 to `seda:x`, 2 to `seda:y`, 1 to `seda:z`.
### Dynamic Router
Route to endpoints determined at runtime:
```yaml
steps:
- dynamic_router:
simple: "${header.dest}"
```
> **Note:** The `dynamic_router` uses a 60s timeout by default. This is a rust-camel extension — Apache Camel's `dynamicRouter` does not expose a timeout option.
### Routing Slip
Dynamic routing slip pattern:
```yaml
steps:
- routing_slip:
simple: "${header.slip}"
```
### Recipient List
Dynamically resolve endpoints from an expression at runtime:
```yaml
steps:
- recipient_list:
simple: "${header.destinations}"
```
The expression should evaluate to a comma-separated list of endpoint URIs. Supports parallel execution and aggregation strategies.
### Bean
Invoke a registered bean:
```yaml
steps:
- bean:
name: "myProcessor"
method: "handle"
```
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
camel-dsl = "0.8"
```
## Quick Start
### Basic Route
Create a YAML file `routes.yaml`:
```yaml
routes:
- id: "hello-timer"
from: "timer:tick?period=2000"
steps:
- log: "Timer fired!"
- to: "log:info"
```
Load and add to context:
```rust
use camel_dsl::load_from_file;
use camel_core::context::CamelContext;
use camel_component_timer::TimerComponent;
use camel_component_log::LogComponent;
let mut ctx = CamelContext::builder().build().await?;
ctx.register_component(TimerComponent::new());
ctx.register_component(LogComponent::new());
let routes = load_from_file("routes.yaml")?;
for route in routes {
ctx.add_route_definition(route).await?;
}
ctx.start().await?;
```
### With Language Expressions
```yaml
routes:
- id: "filter-demo"
from: "timer:tick?period=1000"
steps:
- set_header:
key: "type"
value: "allowed"
- filter:
simple: "${header.type} == 'allowed'"
steps:
- log: "Passed filter!"
- to: "log:filtered"
```
## JSON Route Definitions
JSON route definitions use the same shape as YAML — the only difference is the serialization format. This means all step types, route-level configuration, and language expressions work identically in both formats.
### Example
Create a JSON file `routes.json`:
```json
{
"routes": [
{
"id": "hello-timer",
"from": "timer:tick?period=2000",
"steps": [
{ "log": "Timer fired!" },
{ "to": "log:info" }
]
}
]
}
```
### Programmatic Loading
```rust
// Parse a JSON string directly
use camel_dsl::parse_json;
let routes = parse_json(&json_string)?;
// Load from a file
use camel_dsl::json::load_json_from_file;
use std::path::Path;
let routes = load_json_from_file(Path::new("routes.json"))?;
```
The type aliases `JsonRoutes`, `JsonRoute`, and `JsonStep` are convenience wrappers around the YAML AST types and are **not a stable SDK contract**. SDKs and external consumers that need forward compatibility should target `CanonicalRouteSpec` instead:
```rust
use camel_dsl::parse_json_to_canonical;
let specs = parse_json_to_canonical(&json_string)?;
```
### Discovery Behavior
When using [`discover_routes`] to load route files via glob patterns, **JSON files require an explicit `.json` glob pattern**. Broad patterns like `routes/*` will intentionally not load `.json` files — this prevents accidental loading from mixed-format directories.
```rust
use camel_dsl::discover_routes;
// ✅ Explicit .json pattern — loads JSON files
let routes = discover_routes(&["routes/*.json".to_string()])?;
// ❌ Broad pattern — will NOT load .json files (returns an error if any are matched)
let routes = discover_routes(&["routes/*".to_string()])?;
// Mixing both formats in one call is fine:
let routes = discover_routes(&[
"routes/*.yaml".to_string(),
"routes/*.json".to_string(),
])?;
```
## Canonical Route Spec
The `canonical` module provides direct parsing of the cross-language route IR, bypassing the YAML/JSON DSL input formats:
```rust
use camel_dsl::canonical::{parse_canonical_json, parse_canonical_route};
// Parse a batch of canonical routes from JSON
let routes = parse_canonical_json(r#"{
"routes": [{
"route_id": "my-route",
"from": "timer:tick?period=1000",
"steps": [
{ "step": "log", "config": { "message": "Hello" } },
{ "step": "to", "config": { "uri": "log:info" } }
],
"version": 1
}]
}"#)?;
// Or compile a single CanonicalRouteSpec
let spec = CanonicalRouteSpec::new("my-route", "timer:tick");
let route = parse_canonical_route(spec)?;
```
### Generate Type Artifacts
```bash
cargo run -p xtask schema
```
Produces JSON Schema and TypeScript types in `schemas/`.
## Available Step Types
| `to` | Send to endpoint | `- to: "log:info"` |
| `log` | Log message | `- log: "Processing"` |
| `set_header` | Set header | `- set_header: { key: "x", value: "y" }` |
| `set_body` | Set body | `- set_body: { value: "content" }` |
| `transform` | Transform body | `- transform: { simple: "${body}" }` |
| `marshal` | Serialize body using a data format (e.g., Json → Text) | `- marshal: json` |
| `unmarshal` | Deserialize body using a data format (e.g., Text → Json) | `- unmarshal: xml` |
| `filter` | Filter messages | `- filter: { simple: "${header.type} == 'allowed'", steps: [...] }` |
| `choice` | Content-based router | `- choice: { when: [...], otherwise: [...] }` |
| `split` | Split message | `- split: { expression: "body_lines", steps: [...] }` |
| `aggregate` | Aggregate messages with size/timeout completion | `- aggregate: { header: "id", completion_size: 5, completion_timeout_ms: 5000 }` |
| `delay` | Delay exchange processing | `- delay: 500` or `- delay: { delay_ms: 1000, dynamic_header: "CamelDelayMs" }` |
| `wire_tap` | Fire-and-forget tap | `- wire_tap: "direct:audit"` |
| `multicast` | Fan-out to multiple | `- multicast: { steps: [...] }` |
| `recipient_list` | Dynamic recipient list | `- recipient_list: { simple: "${header.destinations}" }` |
| `stop` | Stop pipeline | `- stop: true` |
| `script` | Execute script | `- script: { language: "simple", source: "${body}" }` |
| `bean` | Invoke bean method | `- bean: { name: "orderService", method: "process" }` |
### Marshal/Unmarshal Example
```yaml
routes:
- id: "convert-format"
from: "direct:input"
steps:
- unmarshal: json
- marshal: xml
- to: "direct:output"
```
This converts the message body from JSON to XML using the built-in data formats.
### Bean Step
The `bean` step allows you to invoke business logic registered in the BeanRegistry:
```yaml
routes:
- id: "process-order"
from: "direct:orders"
steps:
- bean:
name: "orderService"
method: "validate"
- bean:
name: "orderService"
method: "process"
```
**Prerequisites:**
- Register beans in your Rust code using `BeanRegistry`
- Pass the registry to `DefaultRouteController::with_beans()`
```rust
use camel_bean::BeanRegistry;
use camel_core::DefaultRouteController;
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
let controller = DefaultRouteController::with_beans(bean_registry);
```
See `examples/bean-demo` for a complete example.
### Aggregate Step
The `aggregate` step supports size-based, timeout-based, or combined completion:
```yaml
routes:
- id: "aggregate-demo"
from: "timer:orders?period=200"
steps:
- aggregate:
header: "orderId"
completion_size: 3
completion_timeout_ms: 5000
force_completion_on_stop: true
discard_on_timeout: false
- log: "Batch completed"
- to: "log:info"
```
| Field | Type | Description |
|-------|------|-------------|
| `header` | string | Correlation header name |
| `completion_size` | integer | Complete when N exchanges aggregated |
| `completion_timeout_ms` | integer | Inactivity timeout in ms (resets per exchange) |
| `force_completion_on_stop` | boolean | Force-complete all buckets on route stop |
| `discard_on_timeout` | boolean | Discard (instead of emit) incomplete exchanges on timeout |
## Environment Variable Interpolation
Use `${env:VAR_NAME}` anywhere in a route file to inject an environment variable at load time. This works for both YAML and JSON formats when loaded via [`discover_routes`]:
```yaml
routes:
- id: "env-demo"
from: "timer:tick?period=1000"
steps:
- log: "Broker: ${env:BROKER_URL}"
- to: "${env:OUTPUT_ENDPOINT}"
```
The substitution happens before parsing, so it works in any position — URIs, log messages, header values, etc. If an environment variable is not set and no default is provided via the `:-` syntax, discovery returns a `DiscoveryError::Env` error — the literal `${env:VAR_NAME}` string is **not** left in place.
> **Caution:** Because interpolation is textual (performed before parsing), env values injected into JSON string positions must already be valid for their JSON context. For example, an env var containing an unescaped double quote will produce invalid JSON, causing a `DiscoveryError::Json` parse error. YAML is more forgiving of unquoted values but the same principle applies to structured contexts.
You can specify a default value when the variable is not set:
```yaml
routes:
- id: "env-defaults"
from: "timer:tick?period=${env:POLL_MS:-1000}"
steps:
- log: "Target: ${env:OUTPUT_URI:-log:info}"
- to: "${env:OUTPUT_URI:-log:info}"
```
If `POLL_MS` is not set, the default `1000` is used. The `:-` syntax follows the same convention as shell parameter expansion.
> **Note:** Direct file loaders (`load_from_file` for YAML, `json::load_json_from_file` for JSON) read and parse files without interpolating environment variables. Use [`discover_routes`] if you need env interpolation.
## Language Expressions
Many steps support language expressions for dynamic values:
predicates:
### Syntax
```yaml
# Simple language shortcut
- filter:
simple: "${header.type} == 'allowed'"
steps:
- log: "Match!"
# Explicit language + source
- set_header:
key: "computed"
language: "simple"
source: "${header.base} + '-suffix'"
# Rhai script
- script:
language: "rhai"
source: |
let body = exchange.body();
body.to_upper()
```
### Available Languages
- `simple` - Built-in Simple language (supports header/body interpolation)
- `rhai` - Rhai scripting language (requires `camel-language-rhai` feature)
## Route-Level Configuration
```yaml
routes:
- id: "my-route"
from: "timer:tick?period=1000"
auto_startup: true # Default: true
startup_order: 100 # Default: 1000, lower = earlier
concurrency: concurrent # or "sequential"
error_handler: # Optional error handling
dead_letter_channel: "log:errors"
# Legacy single catch-all retry (still supported)
retry:
max_attempts: 3
initial_delay_ms: 100
# New ordered exception clauses (first-match-wins)
on_exceptions:
- kind: "Io"
retry:
max_attempts: 3
initial_delay_ms: 100
handled_by: "log:io-errors"
- kind: "ProcessorError"
message_contains: "validation"
retry:
max_attempts: 1
circuit_breaker: # Optional circuit breaker
failure_threshold: 5
open_duration_ms: 30000
on_complete: "direct:on-complete" # Optional completion hook URI
on_failure: "direct:on-failure" # Optional failure hook URI
```
### Unit of Work Hooks (YAML)
```yaml
routes:
- id: my-route
from: "timer:tick"
on_complete: "log:done"
on_failure: "log:failed"
steps:
- log: "processing"
```
## Documentation
- [API Documentation](https://docs.rs/camel-dsl)
- [Examples](https://github.com/kennycallado/rust-camel/tree/main/examples)
- [Repository](https://github.com/kennycallado/rust-camel)