camel-test 0.10.0

Testing utilities for rust-camel
Documentation
use std::time::Duration;

use camel_api::body::Body;
use camel_api::{Exchange, Message, Value};
use camel_core::LanguageRegistryError;
use camel_dsl::parse_yaml;
use camel_language_jsonpath::JsonPathLanguage;
use camel_test::CamelTestContext;
use tower::ServiceExt;

fn ensure_jsonpath_registered(ctx: &mut camel_core::CamelContext) {
    match ctx.register_language("jsonpath", Box::new(JsonPathLanguage)) {
        Ok(()) | Err(LanguageRegistryError::AlreadyRegistered { .. }) => {}
    }
}

async fn send_to_direct(h: &CamelTestContext, endpoint_uri: &str, exchange: Exchange) {
    let producer = {
        let ctx = h.ctx().lock().await;
        let producer_ctx = ctx.producer_context();
        let registry = ctx.registry();
        let component = registry
            .get("direct")
            .expect("direct component not registered");
        let endpoint = component
            .create_endpoint(endpoint_uri, &*ctx)
            .expect("failed to create direct endpoint");
        endpoint
            .create_producer(&producer_ctx)
            .expect("failed to create direct producer")
    };

    producer
        .oneshot(exchange)
        .await
        .expect("failed to send exchange to direct endpoint");
}

#[tokio::test]
async fn jsonpath_filter_with_json_body() {
    let h = CamelTestContext::builder()
        .with_direct()
        .with_mock()
        .build()
        .await;

    let mut guard = h.ctx().lock().await;
    ensure_jsonpath_registered(&mut guard);
    drop(guard);

    let yaml = r#"
routes:
  - id: "jsonpath-filter"
    from: "direct:start"
    steps:
      - filter:
          jsonpath: "$.active"
          steps:
            - to: "mock:filtered"
"#;

    let routes = parse_yaml(yaml).expect("YAML parse failed");
    for route in routes {
        h.add_route(route).await.unwrap();
    }
    h.start().await;
    tokio::time::sleep(Duration::from_millis(50)).await;

    let json: serde_json::Value =
        serde_json::from_str(r#"{"active": true, "name": "test"}"#).unwrap();
    let exchange = Exchange::new(Message::new(Body::Json(json)));
    send_to_direct(&h, "direct:start", exchange).await;

    tokio::time::sleep(Duration::from_millis(100)).await;
    h.stop().await;

    let endpoint = h.mock().get_endpoint("filtered").unwrap();
    endpoint.assert_exchange_count(1).await;

    let exchanges = endpoint.get_received_exchanges().await;
    assert_eq!(exchanges.len(), 1);
}

#[tokio::test]
async fn jsonpath_set_header_from_body() {
    let h = CamelTestContext::builder()
        .with_direct()
        .with_mock()
        .build()
        .await;

    let mut guard = h.ctx().lock().await;
    ensure_jsonpath_registered(&mut guard);
    drop(guard);

    let yaml = r#"
routes:
  - id: "jsonpath-header"
    from: "direct:start"
    steps:
      - set_header:
          key: "orderId"
          jsonpath: "$.order.id"
      - to: "mock:header-out"
"#;

    let routes = parse_yaml(yaml).expect("YAML parse failed");
    for route in routes {
        h.add_route(route).await.unwrap();
    }
    h.start().await;
    tokio::time::sleep(Duration::from_millis(50)).await;

    let json: serde_json::Value = serde_json::from_str(r#"{"order": {"id": "ORD-123"}}"#).unwrap();
    let exchange = Exchange::new(Message::new(Body::Json(json)));
    send_to_direct(&h, "direct:start", exchange).await;

    tokio::time::sleep(Duration::from_millis(100)).await;
    h.stop().await;

    let endpoint = h.mock().get_endpoint("header-out").unwrap();
    endpoint.assert_exchange_count(1).await;

    let exchanges = endpoint.get_received_exchanges().await;
    let ex = &exchanges[0];
    assert_eq!(
        ex.input.header("orderId"),
        Some(&Value::String("ORD-123".into())),
        "Header 'orderId' should be 'ORD-123'"
    );
}

#[tokio::test]
async fn jsonpath_set_body_extracts_field() {
    let h = CamelTestContext::builder()
        .with_direct()
        .with_mock()
        .build()
        .await;

    let mut guard = h.ctx().lock().await;
    ensure_jsonpath_registered(&mut guard);
    drop(guard);

    let yaml = r#"
routes:
  - id: "jsonpath-body"
    from: "direct:start"
    steps:
      - set_body:
          jsonpath: "$.items[0]"
      - to: "mock:body-out"
"#;

    let routes = parse_yaml(yaml).expect("YAML parse failed");
    for route in routes {
        h.add_route(route).await.unwrap();
    }
    h.start().await;
    tokio::time::sleep(Duration::from_millis(50)).await;

    let json: serde_json::Value =
        serde_json::from_str(r#"{"items": ["first", "second", "third"]}"#).unwrap();
    let exchange = Exchange::new(Message::new(Body::Json(json)));
    send_to_direct(&h, "direct:start", exchange).await;

    tokio::time::sleep(Duration::from_millis(100)).await;
    h.stop().await;

    let endpoint = h.mock().get_endpoint("body-out").unwrap();
    endpoint.assert_exchange_count(1).await;

    let exchanges = endpoint.get_received_exchanges().await;
    let ex = &exchanges[0];
    match &ex.input.body {
        Body::Json(v) => {
            assert_eq!(v, &serde_json::Value::String("first".to_string()));
        }
        Body::Text(t) => {
            assert_eq!(t, "first");
        }
        other => panic!("expected JSON or Text body with 'first', got {:?}", other),
    }
}