camel-core 0.18.0

Core engine for rust-camel
Documentation

camel-core

Core routing engine with DDD/CQRS/Hexagonal architecture for rust-camel

Overview

camel-core is the heart of the rust-camel framework. It implements a Domain-Driven Design architecture with CQRS and Hexagonal Architecture organized as vertical bounded contexts.

Bounded Contexts

crates/camel-core/src/
  lifecycle/          ← Route lifecycle management
    domain/           │  RouteRuntimeAggregate, RouteRuntimeState, RuntimeEvent
    application/      │  RuntimeBus, Command/Query handlers, RouteDefinition
    ports/            │  RouteRepositoryPort, ProjectionStorePort, …
    adapters/         │  InMemory*, RedbRuntimeEventJournal, DefaultRouteController
  hot_reload/         ← Live route updates
    domain/           │  ReloadAction
    application/      │  compute_reload_actions, execute_reload_actions
    adapters/         │  ReloadWatcher
  shared/             ← Cross-cutting concerns
    observability/    │  TracerConfig, TracingProcessor (OTel adapter)
    components/       │  Registry (component lookup by URI scheme)
  context.rs          ← CamelContext (composition root)

Features

  • CamelContext: Central context for managing routes and components
  • CamelContextBuilder: Fluent builder with beans() method for WASM bean plugin registration
  • DDD Aggregate: RouteRuntimeAggregate with state machine and optimistic locking
  • CQRS Runtime Bus: Separate command/query paths with projection-backed reads
  • Event Sourcing: Optional durable journal (redb v2) for crash recovery and replay-consistent lifecycle state
  • Two-phase lifecycle: Start intent projects Starting before runtime side effects confirm Started or compensate to Failed
  • Hexagonal Architecture: Clean separation via ports and adapters
  • Hot-reload: Live route updates with zero downtime, graceful in-flight exchange draining, and Skip optimization for unchanged routes
  • Supervision: Auto-recovery with configurable exponential backoff
  • Exchange UoW layer: ExchangeUoWLayer for per-route in-flight tracking and completion/failure hooks
  • Tracer EIP: Automatic message-flow tracing with configurable detail levels
  • Metrics: Pluggable MetricsCollector integration
  • Datasource Catalog: Named database connection management with shared pools, factory-based health checks, and centralized configuration
  • Optional languages: lang-js and lang-rhai feature flags

Installation

Add to your Cargo.toml:

[dependencies]
camel-core = "*"

Optional Features

Feature Description
lang-js JavaScript scripting via camel-language-js
lang-rhai Rhai scripting via camel-language-rhai
[dependencies]
camel-core = { version = "*", features = ["lang-rhai"] }

Usage

Creating a Camel Context

use camel_core::CamelContext;
use camel_builder::RouteBuilder;
use camel_component_timer::TimerComponent;
use camel_component_log::LogComponent;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut ctx = CamelContext::builder().build().await?;

    // Register components
    ctx.register_component(TimerComponent::new());
    ctx.register_component(LogComponent::new());

    // Build and add routes
    let route = RouteBuilder::from("timer:hello?period=1000")
        .route_id("hello-route")
        .to("log:info")
        .build()?;

    ctx.add_route_definition(route).await?;

    // Start all routes
    ctx.start().await?;

    // Graceful shutdown
    ctx.stop().await?;

    Ok(())
}

Component Infrastructure

CamelContext implements both ComponentContext and ComponentRegistrar from camel-component-api.

ComponentContext is passed to create_endpoint, giving components read-only access to:

  • Component registry (resolve components by scheme)
  • Language registry (resolve expression languages)
  • Metrics collector

ComponentRegistrar enables trait-based dynamic registration:

use camel_component_api::ComponentRegistrar;
use std::sync::Arc;

ctx.register_component_dyn(Arc::new(MyComponent::new()));
ctx.register_component(MyComponent::new()); // convenience wrapper

ComponentBundle groups related schemes and registers from TOML config:

use camel_component_api::ComponentBundle;

if let Some(raw) = config.components.raw.get("http").cloned() {
    camel_component_http::HttpBundle::from_toml(raw)?.register_all(&mut ctx);
}

Runtime Bus (CQRS)

Control routes via the runtime bus:

use camel_api::RuntimeCommand;

// Start a route
ctx.runtime().execute(RuntimeCommand::StartRoute {
    route_id: "my-route".into(),
    command_id: "cmd-start-1".into(),
    causation_id: None,
}).await?;

// Query route status (reads from projection)
let status = ctx.runtime_route_status("my-route").await?;
println!("Status: {:?}", status);

Exchange Unit of Work (UoW)

Attach per-route exchange lifecycle hooks and in-flight tracking via UnitOfWorkConfig:

use camel_api::UnitOfWorkConfig;
use camel_builder::{RouteBuilder, StepAccumulator};

let route = RouteBuilder::from("direct:orders")
    .route_id("orders-uow")
    .to("log:orders")
    .build()?
    .with_unit_of_work(UnitOfWorkConfig {
        on_complete: Some("direct:on-complete".to_string()),
        on_failure: Some("direct:on-failure".to_string()),
    });

ctx.add_route_definition(route).await?;

Optional Durability

Enable redb-backed event journal for runtime state recovery across restarts:

use std::sync::Arc;
use camel_core::{
    CamelContext, InMemoryRuntimeStore, JournalDurability, RedbJournalOptions,
    RedbRuntimeEventJournal,
};

// Default options (Immediate durability, compaction at 10_000 events)
let ctx = CamelContext::builder()
    .runtime_store(InMemoryRuntimeStore::default().with_journal(Arc::new(
        RedbRuntimeEventJournal::new(".camel/runtime.redb", RedbJournalOptions::default())
            .await?,
    )))
    .build()
    .await?;

// Eventual durability (dev/test — no fsync)
let ctx = CamelContext::builder()
    .runtime_store(InMemoryRuntimeStore::default().with_journal(Arc::new(
        RedbRuntimeEventJournal::new(
            ".camel/runtime.redb",
            RedbJournalOptions {
                durability: JournalDurability::Eventual,
                compaction_threshold_events: 1_000,
            },
        )
        .await?,
    )))
    .build()
    .await?;

// With supervision and metrics
let ctx = CamelContext::with_supervision_and_metrics_and_redb_journal(
    supervision_config,
    metrics,
    ".camel/runtime.redb",
    RedbJournalOptions::default(),
).await?;

Events are persisted and replayed on startup for crash recovery. Lifecycle writes use optimistic versions; commands that perform runtime side effects persist intent first, then confirm success or compensate to Failed so replay and live state converge.

Core Types

Domain Layer

Type Description
RouteRuntimeAggregate DDD aggregate with lifecycle state and version
RouteRuntimeState Enum: Registered, Starting, Started, Suspended, Stopping, Stopped, Failed
RouteLifecycleCommand Domain commands: Start, Stop, Suspend, Resume, Reload, Fail
RuntimeEvent Domain events: RouteStarted, RouteStopped, RouteFailed, etc.

Ports Layer

Port Purpose
RouteRepositoryPort Load/save aggregates
ProjectionStorePort Read/write route status projections
RuntimeExecutionPort Execute side effects on route controller
EventPublisherPort Publish domain events
RuntimeEventJournalPort Durable event persistence
CommandDedupPort Idempotent command handling
RuntimeUnitOfWorkPort Atomic aggregate + projection + event persistence

Adapters Layer

Adapter Implements
InMemoryRouteRepository RouteRepositoryPort
InMemoryProjectionStore ProjectionStorePort
InMemoryRuntimeStore Combined in-memory implementation
RedbRuntimeEventJournal RuntimeEventJournalPort (redb v2)
RuntimeExecutionAdapter RuntimeExecutionPort

Tracer EIP

Automatic message-flow tracing across all route steps:

use camel_core::{CamelContext, TracerConfig, DetailLevel, TracerOutputs, StdoutOutput, OutputFormat};

// Simple toggle
// Inside an async function
let mut ctx = CamelContext::builder().build().await?;
ctx.set_tracing(true);

// Full configuration
let config = TracerConfig {
    enabled: true,
    detail_level: DetailLevel::Medium,
    outputs: TracerOutputs {
        stdout: Some(StdoutOutput { enabled: true, format: OutputFormat::Json }),
        file: None,
    },
};
ctx.set_tracer_config(config);

Or via Camel.toml:

[observability.tracer]
enabled = true
detail_level = "minimal"  # minimal | medium | full

[observability.tracer.outputs.stdout]
enabled = true
format = "json"

Detail levels:

Level Fields
minimal correlation_id, route_id, step_id, step_index, timestamp, duration_ms, status
medium + headers_count, body_type, has_error, output_body_type
full + up to 3 message headers (header_0header_2)

Health Monitoring

use camel_api::{HealthStatus, ServiceStatus};

let report = ctx.health_check_async().await;

match report.status {
    HealthStatus::Healthy => println!("All services healthy"),
    HealthStatus::Unhealthy => {
        for service in &report.services {
            println!("{}: {:?}", service.name, service.status);
        }
    }
}

Metrics

Plug in a custom MetricsCollector at construction time:

use camel_core::CamelContext;
use std::sync::Arc;

let ctx = CamelContext::with_metrics(Arc::new(my_metrics_collector));

// Access the collector later
let metrics = ctx.metrics();

Hot-Reload System

Live route updates without service restart:

use camel_core::reload_watcher::{watch_and_reload, resolve_watch_dirs};

let handle = ctx.runtime_execution_handle();
let patterns = vec!["routes/*.yaml".to_string()];
let watch_dirs = resolve_watch_dirs(&patterns);

watch_and_reload(
    watch_dirs,
    handle,
    || camel_dsl::discover_routes(&patterns)
        .map_err(|e| CamelError::RouteError(e.to_string())),
    Some(cancel_token),
    Duration::from_secs(10),
    Duration::from_millis(300),
).await?;

Supervision

SupervisingRouteController wraps any controller with automatic crash recovery:

use camel_core::SupervisingRouteController;
use camel_api::supervision::SupervisionConfig;

let config = SupervisionConfig {
    initial_delay_ms: 1000,
    backoff_multiplier: 2.0,
    max_delay_ms: 60000,
    max_attempts: 5,
};

let ctx = CamelContext::with_supervision(config);

Architecture Tests

The crate includes hexagonal architecture boundary tests to ensure clean separation:

cargo test -p camel-core --test hexagonal_architecture_boundaries_test

Tests verify:

  • Domain layer has no infrastructure dependencies
  • Application layer depends only on ports and domain
  • Ports layer has no adapter dependencies
  • Bounded contexts do not bypass each other's layers
  • shared/ cross-cutting types are only accessed via canonical paths
  • Runtime side effects flow through RuntimeExecutionPort

Documentation

License

Apache-2.0