camel-component-validator 0.6.3

Validator component for rust-camel (XSD, JSON Schema, YAML)
Documentation
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use tower::Service;
use tracing::debug;

use crate::compiled::CompiledValidator;
use crate::config::ValidatorConfig;
use camel_component_api::ComponentContext;
use camel_component_api::{
    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
};

pub struct ValidatorComponent;

impl ValidatorComponent {
    pub fn new() -> Self {
        Self
    }
}

impl Default for ValidatorComponent {
    fn default() -> Self {
        Self::new()
    }
}

impl Component for ValidatorComponent {
    fn scheme(&self) -> &str {
        "validator"
    }

    fn create_endpoint(
        &self,
        uri: &str,
        _ctx: &dyn ComponentContext,
    ) -> Result<Box<dyn Endpoint>, CamelError> {
        let config = ValidatorConfig::from_uri(uri)?;
        let compiled = CompiledValidator::compile(&config)?;
        Ok(Box::new(ValidatorEndpoint {
            uri: uri.to_string(),
            compiled: Arc::new(compiled),
        }))
    }
}

struct ValidatorEndpoint {
    uri: String,
    compiled: Arc<CompiledValidator>,
}

impl Endpoint for ValidatorEndpoint {
    fn uri(&self) -> &str {
        &self.uri
    }

    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
        Err(CamelError::EndpointCreationFailed(
            "validator endpoint does not support consumers".to_string(),
        ))
    }

    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
        Ok(BoxProcessor::new(ValidatorProducer {
            uri: self.uri.clone(),
            compiled: Arc::clone(&self.compiled),
        }))
    }
}

#[derive(Clone)]
struct ValidatorProducer {
    uri: String,
    compiled: Arc<CompiledValidator>,
}

impl Service<Exchange> for ValidatorProducer {
    type Response = Exchange;
    type Error = CamelError;
    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, exchange: Exchange) -> Self::Future {
        let compiled = Arc::clone(&self.compiled);
        let uri = self.uri.clone();
        Box::pin(async move {
            debug!(uri = uri, "validating exchange body");
            compiled.validate(&exchange.input.body)?;
            Ok(exchange)
        })
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use camel_component_api::{Message, NoOpComponentContext};
    use std::io::Write;
    use tower::ServiceExt;

    fn json_schema_file() -> tempfile::NamedTempFile {
        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
        f.write_all(br#"{"type":"object","required":["id"]}"#)
            .unwrap();
        f
    }

    fn xsd_file() -> tempfile::NamedTempFile {
        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
        f.write_all(
            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
        ).unwrap();
        f
    }

    #[test]
    fn scheme_is_validator() {
        assert_eq!(ValidatorComponent::new().scheme(), "validator");
    }

    #[test]
    fn consumer_not_supported() {
        let f = json_schema_file();
        let uri = format!("validator:{}", f.path().display());
        let ep = ValidatorComponent::new()
            .create_endpoint(&uri, &NoOpComponentContext)
            .unwrap();
        assert!(ep.create_consumer().is_err());
    }

    #[test]
    fn endpoint_creation_fails_for_missing_schema() {
        let result = ValidatorComponent::new()
            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn valid_json_body_passes_through() {
        let f = json_schema_file();
        let uri = format!("validator:{}", f.path().display());
        let ep = ValidatorComponent::new()
            .create_endpoint(&uri, &NoOpComponentContext)
            .unwrap();
        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
            serde_json::json!({"id": "1"}),
        )));
        let result = producer.oneshot(exchange).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn invalid_json_body_returns_err() {
        let f = json_schema_file();
        let uri = format!("validator:{}", f.path().display());
        let ep = ValidatorComponent::new()
            .create_endpoint(&uri, &NoOpComponentContext)
            .unwrap();
        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
            serde_json::json!({"name": "x"}),
        )));
        let result = producer.oneshot(exchange).await;
        assert!(result.is_err());
        let msg = result.unwrap_err().to_string();
        assert!(msg.contains("validation failed"), "got: {msg}");
    }

    #[tokio::test]
    async fn valid_xml_body_passes() {
        let f = xsd_file();
        let uri = format!("validator:{}", f.path().display());
        let ep = ValidatorComponent::new()
            .create_endpoint(&uri, &NoOpComponentContext)
            .unwrap();
        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
            "<order>hello</order>".to_string(),
        )));
        assert!(producer.oneshot(exchange).await.is_ok());
    }
}