camel-core
Core routing engine with DDD/CQRS/Hexagonal architecture for rust-camel
Overview
camel-core is the heart of the rust-camel framework. It implements a Domain-Driven Design architecture with CQRS and Hexagonal Architecture organized as vertical bounded contexts.
Bounded Contexts
crates/camel-core/src/
lifecycle/ ← Route lifecycle management
domain/ │ RouteRuntimeAggregate, RouteRuntimeState, RuntimeEvent
application/ │ RuntimeBus, Command/Query handlers, RouteDefinition
ports/ │ RouteRepositoryPort, ProjectionStorePort, …
adapters/ │ InMemory*, RedbRuntimeEventJournal, DefaultRouteController
hot_reload/ ← Live route updates
domain/ │ ReloadAction
application/ │ compute_reload_actions, execute_reload_actions
adapters/ │ ReloadWatcher
shared/ ← Cross-cutting concerns
observability/ │ TracerConfig, TracingProcessor (OTel adapter)
components/ │ Registry (component lookup by URI scheme)
context.rs ← CamelContext (composition root)
Features
- CamelContext: Central context for managing routes and components
- DDD Aggregate:
RouteRuntimeAggregatewith state machine and optimistic locking - CQRS Runtime Bus: Separate command/query paths with projection-backed reads
- Event Sourcing: Optional durable journal (redb v2) for crash recovery
- Hexagonal Architecture: Clean separation via ports and adapters
- Hot-reload: Live route updates with zero downtime and graceful in-flight exchange draining
- Supervision: Auto-recovery with configurable exponential backoff
- Exchange UoW layer:
ExchangeUoWLayerfor per-route in-flight tracking and completion/failure hooks - Tracer EIP: Automatic message-flow tracing with configurable detail levels
- Metrics: Pluggable
MetricsCollectorintegration - Optional languages:
lang-jsandlang-rhaifeature flags
Installation
Add to your Cargo.toml:
[]
= "0.5"
Optional Features
| Feature | Description |
|---|---|
lang-js |
JavaScript scripting via camel-language-js |
lang-rhai |
Rhai scripting via camel-language-rhai |
[]
= { = "0.5", = ["lang-rhai"] }
Usage
Creating a Camel Context
use CamelContext;
use RouteBuilder;
use TimerComponent;
use LogComponent;
async
Runtime Bus (CQRS)
Control routes via the runtime bus:
use RuntimeCommand;
// Start a route
ctx.runtime.execute.await?;
// Query route status (reads from projection)
let status = ctx.runtime_route_status.await?;
println!;
Exchange Unit of Work (UoW)
Attach per-route exchange lifecycle hooks and in-flight tracking via UnitOfWorkConfig:
use UnitOfWorkConfig;
use ;
let route = from
.route_id
.to
.build?
.with_unit_of_work;
ctx.add_route_definition.await?;
Optional Durability
Enable redb-backed event journal for runtime state recovery across restarts:
use ;
// Default options (Immediate durability, compaction at 10_000 events)
let ctx = new_with_redb_journal.await?;
// Eventual durability (dev/test — no fsync)
let ctx = new_with_redb_journal.await?;
// With supervision and metrics
let ctx = with_supervision_and_metrics_and_redb_journal.await?;
Events are persisted and replayed on startup for crash recovery.
Core Types
Domain Layer
| Type | Description |
|---|---|
RouteRuntimeAggregate |
DDD aggregate with lifecycle state and version |
RouteRuntimeState |
Enum: Registered, Starting, Started, Suspended, Stopping, Stopped, Failed |
RouteLifecycleCommand |
Domain commands: Start, Stop, Suspend, Resume, Reload, Fail |
RuntimeEvent |
Domain events: RouteStarted, RouteStopped, RouteFailed, etc. |
Ports Layer
| Port | Purpose |
|---|---|
RouteRepositoryPort |
Load/save aggregates |
ProjectionStorePort |
Read/write route status projections |
RuntimeExecutionPort |
Execute side effects on route controller |
EventPublisherPort |
Publish domain events |
RuntimeEventJournalPort |
Durable event persistence |
CommandDedupPort |
Idempotent command handling |
RuntimeUnitOfWorkPort |
Atomic aggregate + projection + event persistence |
Adapters Layer
| Adapter | Implements |
|---|---|
InMemoryRouteRepository |
RouteRepositoryPort |
InMemoryProjectionStore |
ProjectionStorePort |
InMemoryRuntimeStore |
Combined in-memory implementation |
RedbRuntimeEventJournal |
RuntimeEventJournalPort (redb v2) |
RuntimeExecutionAdapter |
RuntimeExecutionPort |
Tracer EIP
Automatic message-flow tracing across all route steps:
use ;
// Simple toggle
let mut ctx = new;
ctx.set_tracing;
// Full configuration
let config = TracerConfig ;
ctx.set_tracer_config;
Or via Camel.toml:
[]
= true
= "minimal" # minimal | medium | full
[]
= true
= "json"
Detail levels:
| Level | Fields |
|---|---|
minimal |
correlation_id, route_id, step_id, step_index, timestamp, duration_ms, status |
medium |
+ headers_count, body_type, has_error, output_body_type |
full |
+ up to 3 message headers (header_0…header_2) |
Health Monitoring
use ;
let report = ctx.health_check;
match report.status
Metrics
Plug in a custom MetricsCollector at construction time:
use CamelContext;
use Arc;
let ctx = with_metrics;
// Access the collector later
let metrics = ctx.metrics;
Hot-Reload System
Live route updates without service restart:
use ;
let handle = ctx.runtime_execution_handle;
let patterns = vec!;
let watch_dirs = resolve_watch_dirs;
watch_and_reload.await?;
Supervision
SupervisingRouteController wraps any controller with automatic crash recovery:
use SupervisingRouteController;
use SupervisionConfig;
let config = SupervisionConfig ;
let ctx = with_supervision;
Architecture Tests
The crate includes hexagonal architecture boundary tests to ensure clean separation:
Tests verify:
- Domain layer has no infrastructure dependencies
- Application layer depends only on ports and domain
- Ports layer has no adapter dependencies
- Bounded contexts do not bypass each other's layers
shared/cross-cutting types are only accessed via canonical paths- Runtime side effects flow through
RuntimeExecutionPort
Documentation
License
Apache-2.0