camel-core 0.6.3

Core engine for rust-camel
Documentation

camel-core

Core routing engine with DDD/CQRS/Hexagonal architecture for rust-camel

Overview

camel-core is the heart of the rust-camel framework. It implements a Domain-Driven Design architecture with CQRS and Hexagonal Architecture organized as vertical bounded contexts.

Bounded Contexts

crates/camel-core/src/
  lifecycle/          ← Route lifecycle management
    domain/           │  RouteRuntimeAggregate, RouteRuntimeState, RuntimeEvent
    application/      │  RuntimeBus, Command/Query handlers, RouteDefinition
    ports/            │  RouteRepositoryPort, ProjectionStorePort, …
    adapters/         │  InMemory*, RedbRuntimeEventJournal, DefaultRouteController
  hot_reload/         ← Live route updates
    domain/           │  ReloadAction
    application/      │  compute_reload_actions, execute_reload_actions
    adapters/         │  ReloadWatcher
  shared/             ← Cross-cutting concerns
    observability/    │  TracerConfig, TracingProcessor (OTel adapter)
    components/       │  Registry (component lookup by URI scheme)
  context.rs          ← CamelContext (composition root)

Features

  • CamelContext: Central context for managing routes and components
  • DDD Aggregate: RouteRuntimeAggregate with state machine and optimistic locking
  • CQRS Runtime Bus: Separate command/query paths with projection-backed reads
  • Event Sourcing: Optional durable journal (redb v2) for crash recovery
  • Hexagonal Architecture: Clean separation via ports and adapters
  • Hot-reload: Live route updates with zero downtime, graceful in-flight exchange draining, and Skip optimization for unchanged routes
  • Supervision: Auto-recovery with configurable exponential backoff
  • Exchange UoW layer: ExchangeUoWLayer for per-route in-flight tracking and completion/failure hooks
  • Tracer EIP: Automatic message-flow tracing with configurable detail levels
  • Metrics: Pluggable MetricsCollector integration
  • Optional languages: lang-js and lang-rhai feature flags

Installation

Add to your Cargo.toml:

[dependencies]
camel-core = "0.5"

Optional Features

Feature Description
lang-js JavaScript scripting via camel-language-js
lang-rhai Rhai scripting via camel-language-rhai
[dependencies]
camel-core = { version = "0.5", features = ["lang-rhai"] }

Usage

Creating a Camel Context

use camel_core::CamelContext;
use camel_builder::RouteBuilder;
use camel_component_timer::TimerComponent;
use camel_component_log::LogComponent;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ctx = CamelContext::builder().build().await?;

    // Register components
    ctx.register_component(TimerComponent::new());
    ctx.register_component(LogComponent::new());

    // Build and add routes
    let route = RouteBuilder::from("timer:hello?period=1000")
        .route_id("hello-route")
        .to("log:info")
        .build()?;

    ctx.add_route_definition(route).await?;

    // Start all routes
    ctx.start().await?;

    // Graceful shutdown
    ctx.stop().await?;

    Ok(())
}

Component Infrastructure

CamelContext implements both ComponentContext and ComponentRegistrar from camel-component-api.

ComponentContext is passed to create_endpoint, giving components read-only access to:

  • Component registry (resolve components by scheme)
  • Language registry (resolve expression languages)
  • Metrics collector

ComponentRegistrar enables trait-based dynamic registration:

use camel_component_api::ComponentRegistrar;
use std::sync::Arc;

ctx.register_component_dyn(Arc::new(MyComponent::new()));
ctx.register_component(MyComponent::new()); // convenience wrapper

ComponentBundle groups related schemes and registers from TOML config:

use camel_component_api::ComponentBundle;

if let Some(raw) = config.components.raw.get("http").cloned() {
    camel_component_http::HttpBundle::from_toml(raw)?.register_all(&mut ctx);
}

Runtime Bus (CQRS)

Control routes via the runtime bus:

use camel_api::RuntimeCommand;

// Start a route
ctx.runtime().execute(RuntimeCommand::StartRoute {
    route_id: "my-route".into(),
    command_id: "cmd-start-1".into(),
    causation_id: None,
}).await?;

// Query route status (reads from projection)
let status = ctx.runtime_route_status("my-route").await?;
println!("Status: {:?}", status);

Exchange Unit of Work (UoW)

Attach per-route exchange lifecycle hooks and in-flight tracking via UnitOfWorkConfig:

use camel_api::UnitOfWorkConfig;
use camel_builder::{RouteBuilder, StepAccumulator};

let route = RouteBuilder::from("direct:orders")
    .route_id("orders-uow")
    .to("log:orders")
    .build()?
    .with_unit_of_work(UnitOfWorkConfig {
        on_complete: Some("direct:on-complete".to_string()),
        on_failure: Some("direct:on-failure".to_string()),
    });

ctx.add_route_definition(route).await?;

Optional Durability

Enable redb-backed event journal for runtime state recovery across restarts:

use std::sync::Arc;
use camel_core::{
    CamelContext, InMemoryRuntimeStore, JournalDurability, RedbJournalOptions,
    RedbRuntimeEventJournal,
};

// Default options (Immediate durability, compaction at 10_000 events)
let ctx = CamelContext::builder()
    .runtime_store(InMemoryRuntimeStore::default().with_journal(Arc::new(
        RedbRuntimeEventJournal::new(".camel/runtime.redb", RedbJournalOptions::default())
            .await?,
    )))
    .build()
    .await?;

// Eventual durability (dev/test — no fsync)
let ctx = CamelContext::builder()
    .runtime_store(InMemoryRuntimeStore::default().with_journal(Arc::new(
        RedbRuntimeEventJournal::new(
            ".camel/runtime.redb",
            RedbJournalOptions {
                durability: JournalDurability::Eventual,
                compaction_threshold_events: 1_000,
            },
        )
        .await?,
    )))
    .build()
    .await?;

// With supervision and metrics
let ctx = CamelContext::with_supervision_and_metrics_and_redb_journal(
    supervision_config,
    metrics,
    ".camel/runtime.redb",
    RedbJournalOptions::default(),
).await?;

Events are persisted and replayed on startup for crash recovery.

Core Types

Domain Layer

Type Description
RouteRuntimeAggregate DDD aggregate with lifecycle state and version
RouteRuntimeState Enum: Registered, Starting, Started, Suspended, Stopping, Stopped, Failed
RouteLifecycleCommand Domain commands: Start, Stop, Suspend, Resume, Reload, Fail
RuntimeEvent Domain events: RouteStarted, RouteStopped, RouteFailed, etc.

Ports Layer

Port Purpose
RouteRepositoryPort Load/save aggregates
ProjectionStorePort Read/write route status projections
RuntimeExecutionPort Execute side effects on route controller
EventPublisherPort Publish domain events
RuntimeEventJournalPort Durable event persistence
CommandDedupPort Idempotent command handling
RuntimeUnitOfWorkPort Atomic aggregate + projection + event persistence

Adapters Layer

Adapter Implements
InMemoryRouteRepository RouteRepositoryPort
InMemoryProjectionStore ProjectionStorePort
InMemoryRuntimeStore Combined in-memory implementation
RedbRuntimeEventJournal RuntimeEventJournalPort (redb v2)
RuntimeExecutionAdapter RuntimeExecutionPort

Tracer EIP

Automatic message-flow tracing across all route steps:

use camel_core::{CamelContext, TracerConfig, DetailLevel, TracerOutputs, StdoutOutput, OutputFormat};

// Simple toggle
// Inside an async function
let mut ctx = CamelContext::builder().build().await?;
ctx.set_tracing(true);

// Full configuration
let config = TracerConfig {
    enabled: true,
    detail_level: DetailLevel::Medium,
    outputs: TracerOutputs {
        stdout: Some(StdoutOutput { enabled: true, format: OutputFormat::Json }),
        file: None,
    },
};
ctx.set_tracer_config(config);

Or via Camel.toml:

[observability.tracer]
enabled = true
detail_level = "minimal"  # minimal | medium | full

[observability.tracer.outputs.stdout]
enabled = true
format = "json"

Detail levels:

Level Fields
minimal correlation_id, route_id, step_id, step_index, timestamp, duration_ms, status
medium + headers_count, body_type, has_error, output_body_type
full + up to 3 message headers (header_0header_2)

Health Monitoring

use camel_api::{HealthStatus, ServiceStatus};

let report = ctx.health_check();

match report.status {
    HealthStatus::Healthy => println!("All services healthy"),
    HealthStatus::Unhealthy => {
        for service in &report.services {
            println!("{}: {:?}", service.name, service.status);
        }
    }
}

Metrics

Plug in a custom MetricsCollector at construction time:

use camel_core::CamelContext;
use std::sync::Arc;

let ctx = CamelContext::with_metrics(Arc::new(my_metrics_collector));

// Access the collector later
let metrics = ctx.metrics();

Hot-Reload System

Live route updates without service restart:

use camel_core::reload_watcher::{watch_and_reload, resolve_watch_dirs};

let handle = ctx.runtime_execution_handle();
let patterns = vec!["routes/*.yaml".to_string()];
let watch_dirs = resolve_watch_dirs(&patterns);

watch_and_reload(
    watch_dirs,
    handle,
    || camel_dsl::discover_routes(&patterns)
        .map_err(|e| CamelError::RouteError(e.to_string())),
    Some(cancel_token),
    Duration::from_secs(10),
    Duration::from_millis(300),
).await?;

Supervision

SupervisingRouteController wraps any controller with automatic crash recovery:

use camel_core::SupervisingRouteController;
use camel_api::supervision::SupervisionConfig;

let config = SupervisionConfig {
    initial_delay_ms: 1000,
    backoff_multiplier: 2.0,
    max_delay_ms: 60000,
    max_attempts: 5,
};

let ctx = CamelContext::with_supervision(config);

Architecture Tests

The crate includes hexagonal architecture boundary tests to ensure clean separation:

cargo test -p camel-core --test hexagonal_architecture_boundaries_test

Tests verify:

  • Domain layer has no infrastructure dependencies
  • Application layer depends only on ports and domain
  • Ports layer has no adapter dependencies
  • Bounded contexts do not bypass each other's layers
  • shared/ cross-cutting types are only accessed via canonical paths
  • Runtime side effects flow through RuntimeExecutionPort

Documentation

License

Apache-2.0