use camel_api::{Body, Exchange, Message};
use camel_bean::{BeanProcessor, BeanRegistry, bean_impl, handler};
use camel_core::{DefaultRouteController, Registry};
use camel_dsl::{
DeclarativeRoute, DeclarativeStep, ToStepDef, compile_declarative_route, model::BeanStepDef,
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex as StdMutex};
use tokio::sync::Mutex;
use tower::ServiceExt;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct Order {
id: String,
amount: u32,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct ProcessedOrder {
order_id: String,
status: String,
total: u32,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct ValidatedOrder {
order_id: String,
is_valid: bool,
}
struct OrderService;
#[bean_impl]
impl OrderService {
#[handler]
pub async fn process(&self, body: Order) -> Result<ProcessedOrder, String> {
Ok(ProcessedOrder {
order_id: body.id,
status: "processed".to_string(),
total: body.amount * 100, })
}
#[handler]
pub async fn validate(&self, body: Order) -> Result<ValidatedOrder, String> {
Ok(ValidatedOrder {
order_id: body.id,
is_valid: body.amount > 0,
})
}
#[allow(dead_code)]
pub fn helper(&self) -> String {
"helper".to_string()
}
}
struct AuditService;
#[bean_impl]
impl AuditService {
#[handler]
pub async fn log(&self, body: ProcessedOrder) -> Result<ProcessedOrder, String> {
println!(
"Audit: Order {} processed with status {}",
body.order_id, body.status
);
Ok(body)
}
}
fn create_test_controller(bean_registry: BeanRegistry) -> DefaultRouteController {
let registry = Arc::new(StdMutex::new(Registry::new()));
create_test_controller_with_registry(registry, bean_registry)
}
fn create_test_controller_with_registry(
registry: Arc<StdMutex<Registry>>,
bean_registry: BeanRegistry,
) -> DefaultRouteController {
let bean_registry = Arc::new(StdMutex::new(bean_registry));
let mut controller =
DefaultRouteController::with_beans(Arc::clone(®istry), Arc::clone(&bean_registry));
let self_ref: Arc<Mutex<dyn camel_api::RouteController>> = Arc::new(Mutex::new(
DefaultRouteController::with_beans(registry, bean_registry),
));
controller.set_self_ref(self_ref);
controller
}
#[tokio::test]
async fn test_bean_registration_and_lookup() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
bean_registry.register("auditService", AuditService);
assert!(bean_registry.get("orderService").is_some());
assert!(bean_registry.get("auditService").is_some());
assert!(bean_registry.get("unknownService").is_none());
}
#[tokio::test]
async fn test_simple_bean_route() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
let controller = create_test_controller(bean_registry);
let route = DeclarativeRoute {
from: "direct:start".to_string(),
route_id: "test-simple-bean-route".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![DeclarativeStep::Bean(BeanStepDef::new(
"orderService",
"process",
))],
};
let route_def = compile_declarative_route(route).unwrap();
let pipeline = controller.compile_route_definition(route_def).unwrap();
let order = Order {
id: "ORDER-123".to_string(),
amount: 5,
};
let message = Message {
body: Body::Json(serde_json::to_value(&order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let processed: ProcessedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(processed.order_id, "ORDER-123");
assert_eq!(processed.status, "processed");
assert_eq!(processed.total, 500); }
_ => panic!("Expected Json body"),
}
}
#[tokio::test]
async fn test_multi_step_bean_route() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
bean_registry.register("auditService", AuditService);
let controller = create_test_controller(bean_registry);
let route = DeclarativeRoute {
from: "direct:start".to_string(),
route_id: "test-multi-bean-route".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![
DeclarativeStep::Bean(BeanStepDef::new("orderService", "process")),
DeclarativeStep::Bean(BeanStepDef::new("auditService", "log")),
],
};
let route_def = compile_declarative_route(route).unwrap();
let pipeline = controller.compile_route_definition(route_def).unwrap();
let order = Order {
id: "ORDER-456".to_string(),
amount: 10,
};
let message = Message {
body: Body::Json(serde_json::to_value(&order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let processed: ProcessedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(processed.order_id, "ORDER-456");
assert_eq!(processed.status, "processed");
assert_eq!(processed.total, 1000); }
_ => panic!("Expected Json body"),
}
}
#[tokio::test]
async fn test_bean_with_validation() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
let controller = create_test_controller(bean_registry);
let route = DeclarativeRoute {
from: "direct:start".to_string(),
route_id: "test-validation-route".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![DeclarativeStep::Bean(BeanStepDef::new(
"orderService",
"validate",
))],
};
let route_def = compile_declarative_route(route).unwrap();
let pipeline = controller.compile_route_definition(route_def).unwrap();
let valid_order = Order {
id: "ORDER-789".to_string(),
amount: 100,
};
let message = Message {
body: Body::Json(serde_json::to_value(&valid_order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.clone().oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let validated: ValidatedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(validated.order_id, "ORDER-789");
assert!(validated.is_valid);
}
_ => panic!("Expected Json body"),
}
let invalid_order = Order {
id: "ORDER-000".to_string(),
amount: 0,
};
let message = Message {
body: Body::Json(serde_json::to_value(&invalid_order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let validated: ValidatedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(validated.order_id, "ORDER-000");
assert!(!validated.is_valid);
}
_ => panic!("Expected Json body"),
}
}
#[tokio::test]
async fn test_bean_route_with_other_steps() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
use camel_component_mock::MockComponent;
let registry = Arc::new(StdMutex::new(Registry::new()));
registry.lock().unwrap().register(MockComponent::new());
let controller = create_test_controller_with_registry(registry, bean_registry);
let route = DeclarativeRoute {
from: "direct:start".to_string(),
route_id: "test-mixed-route".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![
DeclarativeStep::Bean(BeanStepDef::new("orderService", "process")),
DeclarativeStep::To(ToStepDef::new("mock:result")),
],
};
let route_def = compile_declarative_route(route).unwrap();
let pipeline = controller.compile_route_definition(route_def).unwrap();
let order = Order {
id: "ORDER-MIXED".to_string(),
amount: 7,
};
let message = Message {
body: Body::Json(serde_json::to_value(&order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let processed: ProcessedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(processed.order_id, "ORDER-MIXED");
assert_eq!(processed.status, "processed");
assert_eq!(processed.total, 700); }
_ => panic!("Expected Json body"),
}
}
#[tokio::test]
async fn test_unknown_bean_error() {
let bean_registry = BeanRegistry::new();
let controller = create_test_controller(bean_registry);
let route = DeclarativeRoute {
from: "direct:start".to_string(),
route_id: "test-unknown-bean".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![DeclarativeStep::Bean(BeanStepDef::new(
"unknownService",
"process",
))],
};
let route_def = compile_declarative_route(route).unwrap();
let result = controller.compile_route_definition(route_def);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("Bean not found"));
}
#[tokio::test]
async fn test_bean_handler_methods_only() {
let service = OrderService;
let methods = service.methods();
assert!(methods.contains(&"process"));
assert!(methods.contains(&"validate"));
assert!(!methods.contains(&"helper")); }
#[tokio::test]
async fn test_yaml_to_bean_execution() {
let mut bean_registry = BeanRegistry::new();
bean_registry.register("orderService", OrderService);
let controller = create_test_controller(bean_registry);
let route = DeclarativeRoute {
from: "direct:orders".to_string(),
route_id: "order-processing-route".to_string(),
auto_startup: true,
startup_order: 1,
concurrency: None,
error_handler: None,
circuit_breaker: None,
unit_of_work: None,
steps: vec![DeclarativeStep::Bean(BeanStepDef::new(
"orderService",
"process",
))],
};
let route_def = compile_declarative_route(route).unwrap();
let pipeline = controller.compile_route_definition(route_def).unwrap();
let order = Order {
id: "ORDER-YAML-TEST".to_string(),
amount: 3,
};
let message = Message {
body: Body::Json(serde_json::to_value(&order).unwrap()),
..Default::default()
};
let exchange = Exchange::new(message);
let result = pipeline.oneshot(exchange).await.unwrap();
match &result.input.body {
Body::Json(value) => {
let processed: ProcessedOrder = serde_json::from_value(value.clone()).unwrap();
assert_eq!(processed.order_id, "ORDER-YAML-TEST");
assert_eq!(processed.status, "processed");
assert_eq!(processed.total, 300);
println!("✓ YAML → Bean integration test passed!");
println!(
" Input: Order {{ id: {}, amount: {} }}",
order.id, order.amount
);
println!(
" Output: ProcessedOrder {{ order_id: {}, status: {}, total: {} }}",
processed.order_id, processed.status, processed.total
);
}
_ => panic!("Expected Json body"),
}
}