use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use tower::Service;
use tracing::{debug, error, info, trace, warn};
use camel_component_api::UriConfig;
use camel_component_api::{BoxProcessor, CamelError, Exchange};
use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LogLevel {
Trace,
Debug,
#[default]
Info,
Warn,
Error,
}
impl FromStr for LogLevel {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"trace" => Ok(LogLevel::Trace),
"debug" => Ok(LogLevel::Debug),
"info" => Ok(LogLevel::Info),
"warn" | "warning" => Ok(LogLevel::Warn),
"error" => Ok(LogLevel::Error),
_ => Err(format!("Invalid log level: {}", s)),
}
}
}
#[derive(Debug, Clone, UriConfig)]
#[uri_scheme = "log"]
#[uri_config(crate = "camel_component_api")]
pub struct LogConfig {
pub category: String,
#[uri_param(default = "Info")]
pub level: LogLevel,
#[uri_param(name = "showHeaders", default = "false")]
pub show_headers: bool,
#[uri_param(name = "showBody", default = "true")]
pub show_body: bool,
}
pub struct LogComponent;
impl LogComponent {
pub fn new() -> Self {
Self
}
}
impl Default for LogComponent {
fn default() -> Self {
Self::new()
}
}
impl Component for LogComponent {
fn scheme(&self) -> &str {
"log"
}
fn create_endpoint(
&self,
uri: &str,
_ctx: &dyn camel_component_api::ComponentContext,
) -> Result<Box<dyn Endpoint>, CamelError> {
let config = LogConfig::from_uri(uri)?;
Ok(Box::new(LogEndpoint {
uri: uri.to_string(),
config,
}))
}
}
struct LogEndpoint {
uri: String,
config: LogConfig,
}
impl Endpoint for LogEndpoint {
fn uri(&self) -> &str {
&self.uri
}
fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
Err(CamelError::EndpointCreationFailed(
"log endpoint does not support consumers".to_string(),
))
}
fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
Ok(BoxProcessor::new(LogProducer {
config: self.config.clone(),
}))
}
}
#[derive(Clone)]
struct LogProducer {
config: LogConfig,
}
impl LogProducer {
fn format_exchange(&self, exchange: &Exchange) -> String {
let mut parts = Vec::new();
if self.config.show_body {
let body_str = match &exchange.input.body {
camel_component_api::Body::Empty => "[empty]".to_string(),
camel_component_api::Body::Text(s) => s.clone(),
camel_component_api::Body::Json(v) => v.to_string(),
camel_component_api::Body::Xml(s) => s.clone(),
camel_component_api::Body::Bytes(b) => format!("[{} bytes]", b.len()),
camel_component_api::Body::Stream(s) => {
format!("[Stream: origin={:?}]", s.metadata.origin)
}
};
parts.push(format!("Body: {body_str}"));
}
if self.config.show_headers && !exchange.input.headers.is_empty() {
let headers: Vec<String> = exchange
.input
.headers
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect();
parts.push(format!("Headers: {{{}}}", headers.join(", ")));
}
if parts.is_empty() {
format!("[{}] Exchange received", self.config.category)
} else {
format!("[{}] {}", self.config.category, parts.join(" | "))
}
}
}
impl Service<Exchange> for LogProducer {
type Response = Exchange;
type Error = CamelError;
type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let msg = self.format_exchange(&exchange);
let level = self.config.level;
Box::pin(async move {
match level {
LogLevel::Trace => trace!("{msg}"),
LogLevel::Debug => debug!("{msg}"),
LogLevel::Info => info!("{msg}"),
LogLevel::Warn => warn!("{msg}"),
LogLevel::Error => error!("{msg}"),
}
Ok(exchange)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_component_api::Message;
use camel_component_api::NoOpComponentContext;
use tower::ServiceExt;
fn test_producer_ctx() -> ProducerContext {
ProducerContext::new()
}
#[test]
fn test_log_config_defaults() {
let config = LogConfig::from_uri("log:myCategory").unwrap();
assert_eq!(config.category, "myCategory");
assert_eq!(config.level, LogLevel::Info);
assert!(!config.show_headers);
assert!(config.show_body);
}
#[test]
fn test_log_config_with_params() {
let config =
LogConfig::from_uri("log:app?level=debug&showHeaders=true&showBody=false").unwrap();
assert_eq!(config.category, "app");
assert_eq!(config.level, LogLevel::Debug);
assert!(config.show_headers);
assert!(!config.show_body);
}
#[test]
fn test_log_config_wrong_scheme() {
let result = LogConfig::from_uri("timer:tick");
assert!(result.is_err());
}
#[test]
fn test_log_component_scheme() {
let component = LogComponent::new();
assert_eq!(component.scheme(), "log");
}
#[test]
fn test_log_endpoint_no_consumer() {
let component = LogComponent::new();
let endpoint = component
.create_endpoint("log:info", &NoOpComponentContext)
.unwrap();
assert!(endpoint.create_consumer().is_err());
}
#[test]
fn test_log_endpoint_creates_producer() {
let ctx = test_producer_ctx();
let component = LogComponent::new();
let endpoint = component
.create_endpoint("log:info", &NoOpComponentContext)
.unwrap();
assert!(endpoint.create_producer(&ctx).is_ok());
}
#[tokio::test]
async fn test_log_producer_processes_exchange() {
let ctx = test_producer_ctx();
let component = LogComponent::new();
let endpoint = component
.create_endpoint("log:test?showHeaders=true", &NoOpComponentContext)
.unwrap();
let producer = endpoint.create_producer(&ctx).unwrap();
let mut exchange = Exchange::new(Message::new("hello world"));
exchange
.input
.set_header("source", serde_json::Value::String("test".into()));
let result = producer.oneshot(exchange).await.unwrap();
assert_eq!(result.input.body.as_text(), Some("hello world"));
}
}