use camel_api::Value;
use camel_api::body::Body;
use camel_builder::{RouteBuilder, StepAccumulator};
use camel_component_direct::DirectComponent;
use camel_component_mock::MockComponent;
use camel_component_timer::TimerComponent;
use camel_core::CamelContext;
use camel_processor::LogLevel;
#[tokio::test]
async fn test_log_eip_with_timer() {
let mock = MockComponent::new();
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(TimerComponent::new());
ctx.register_component(mock.clone());
let route = RouteBuilder::from("timer:log-test?period=50&repeatCount=1")
.route_id("log-eip-timer-route")
.log("Starting processing", LogLevel::Info)
.log("Debug message", LogLevel::Debug)
.process(|mut ex: camel_api::Exchange| async move {
ex.input.body = Body::Text("processed".into());
Ok(ex)
})
.log("Finished processing", LogLevel::Info)
.to("mock:result")
.build()
.unwrap();
ctx.add_route_definition(route).await.unwrap();
ctx.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
ctx.stop().await.unwrap();
let endpoint = mock.get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(exchanges[0].input.body.as_text(), Some("processed"));
}
#[tokio::test]
async fn test_log_eip_with_direct() {
let mock = MockComponent::new();
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(TimerComponent::new());
ctx.register_component(DirectComponent::new());
ctx.register_component(mock.clone());
let trigger_route = RouteBuilder::from("timer:trigger?period=50&repeatCount=1")
.route_id("trigger-route")
.set_header("source", Value::String("timer".into()))
.to("direct:process")
.build()
.unwrap();
let process_route = RouteBuilder::from("direct:process")
.route_id("process-route")
.log("Starting processing in direct route", LogLevel::Info)
.log("Processing exchange", LogLevel::Debug)
.process(|mut ex: camel_api::Exchange| async move {
ex.input.body = Body::Text("processed".into());
Ok(ex)
})
.log("Finished processing in direct route", LogLevel::Info)
.to("mock:result")
.build()
.unwrap();
ctx.add_route_definition(trigger_route).await.unwrap();
ctx.add_route_definition(process_route).await.unwrap();
ctx.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
ctx.stop().await.unwrap();
let endpoint = mock.get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(exchanges[0].input.body.as_text(), Some("processed"));
assert_eq!(
exchanges[0].input.header("source"),
Some(&Value::String("timer".into()))
);
}
#[tokio::test]
async fn test_log_eip_in_filter_scope() {
let mock = MockComponent::new();
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(TimerComponent::new());
ctx.register_component(mock.clone());
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let route = RouteBuilder::from("timer:filter-log?period=50&repeatCount=2")
.route_id("filter-log-route")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ex.input.set_header("count", Value::Number(n.into()));
Ok(ex)
}
})
.log("Before filter", LogLevel::Info)
.filter(|ex| {
ex.input
.header("count")
.and_then(|v| v.as_u64())
.map(|n| n % 2 == 0)
.unwrap_or(false)
})
.log("Inside filter - only even counts", LogLevel::Info)
.set_header("filtered", Value::Bool(true))
.end_filter()
.log("After filter", LogLevel::Info)
.to("mock:result")
.build()
.unwrap();
ctx.add_route_definition(route).await.unwrap();
ctx.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
ctx.stop().await.unwrap();
let endpoint = mock.get_endpoint("result").unwrap();
endpoint.assert_exchange_count(2).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges[0].input.header("filtered"),
Some(&Value::Bool(true))
);
assert!(exchanges[1].input.header("filtered").is_none());
}
#[tokio::test]
async fn test_log_eip_in_split_scope() {
use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
let mock = MockComponent::new();
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(TimerComponent::new());
ctx.register_component(mock.clone());
let route = RouteBuilder::from("timer:split-log?period=50&repeatCount=1")
.route_id("split-log-route")
.process(|mut ex: camel_api::Exchange| async move {
ex.input.body = Body::Text("line1\nline2\nline3".to_string());
Ok(ex)
})
.log("Before split", LogLevel::Info)
.split(SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::CollectAll))
.log("Processing fragment", LogLevel::Info)
.to("mock:per-line")
.end_split()
.log("After split", LogLevel::Info)
.to("mock:final")
.build()
.unwrap();
ctx.add_route_definition(route).await.unwrap();
ctx.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
ctx.stop().await.unwrap();
let per_line = mock.get_endpoint("per-line").unwrap();
let per_line_count = per_line.get_received_exchanges().await.len();
assert_eq!(per_line_count, 3, "Expected 3 per-line exchanges");
let final_ep = mock.get_endpoint("final").unwrap();
let final_exchanges = final_ep.get_received_exchanges().await;
assert_eq!(final_exchanges.len(), 1, "Expected 1 final exchange");
}
#[tokio::test]
async fn test_log_eip_multiple_levels() {
let mock = MockComponent::new();
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(TimerComponent::new());
ctx.register_component(mock.clone());
let route = RouteBuilder::from("timer:levels?period=50&repeatCount=1")
.route_id("levels-route")
.log("Trace level message", LogLevel::Trace)
.log("Debug level message", LogLevel::Debug)
.log("Info level message", LogLevel::Info)
.log("Warn level message", LogLevel::Warn)
.log("Error level message", LogLevel::Error)
.to("mock:result")
.build()
.unwrap();
ctx.add_route_definition(route).await.unwrap();
ctx.start().await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
ctx.stop().await.unwrap();
let endpoint = mock.get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
}