Skip to main content

camel_component_validator/
component.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use camel_component_api::ComponentContext;
12use camel_component_api::{
13    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
14};
15
16pub struct ValidatorComponent;
17
18impl ValidatorComponent {
19    pub fn new() -> Self {
20        Self
21    }
22}
23
24impl Default for ValidatorComponent {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl Component for ValidatorComponent {
31    fn scheme(&self) -> &str {
32        "validator"
33    }
34
35    fn create_endpoint(
36        &self,
37        uri: &str,
38        _ctx: &dyn ComponentContext,
39    ) -> Result<Box<dyn Endpoint>, CamelError> {
40        let config = ValidatorConfig::from_uri(uri)?;
41        let compiled = CompiledValidator::compile(&config)?;
42        Ok(Box::new(ValidatorEndpoint {
43            uri: uri.to_string(),
44            compiled: Arc::new(compiled),
45        }))
46    }
47}
48
49struct ValidatorEndpoint {
50    uri: String,
51    compiled: Arc<CompiledValidator>,
52}
53
54impl Endpoint for ValidatorEndpoint {
55    fn uri(&self) -> &str {
56        &self.uri
57    }
58
59    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
60        Err(CamelError::EndpointCreationFailed(
61            "validator endpoint does not support consumers".to_string(),
62        ))
63    }
64
65    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
66        Ok(BoxProcessor::new(ValidatorProducer {
67            uri: self.uri.clone(),
68            compiled: Arc::clone(&self.compiled),
69        }))
70    }
71}
72
73#[derive(Clone)]
74struct ValidatorProducer {
75    uri: String,
76    compiled: Arc<CompiledValidator>,
77}
78
79impl Service<Exchange> for ValidatorProducer {
80    type Response = Exchange;
81    type Error = CamelError;
82    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
83
84    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        Poll::Ready(Ok(()))
86    }
87
88    fn call(&mut self, exchange: Exchange) -> Self::Future {
89        let compiled = Arc::clone(&self.compiled);
90        let uri = self.uri.clone();
91        Box::pin(async move {
92            debug!(uri = uri, "validating exchange body");
93            compiled.validate(&exchange.input.body)?;
94            Ok(exchange)
95        })
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use camel_component_api::{Message, NoOpComponentContext};
103    use std::io::Write;
104    use tower::ServiceExt;
105
106    fn json_schema_file() -> tempfile::NamedTempFile {
107        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
108        f.write_all(br#"{"type":"object","required":["id"]}"#)
109            .unwrap();
110        f
111    }
112
113    fn xsd_file() -> tempfile::NamedTempFile {
114        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
115        f.write_all(
116            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
117        ).unwrap();
118        f
119    }
120
121    #[test]
122    fn scheme_is_validator() {
123        assert_eq!(ValidatorComponent::new().scheme(), "validator");
124    }
125
126    #[test]
127    fn consumer_not_supported() {
128        let f = json_schema_file();
129        let uri = format!("validator:{}", f.path().display());
130        let ep = ValidatorComponent::new()
131            .create_endpoint(&uri, &NoOpComponentContext)
132            .unwrap();
133        assert!(ep.create_consumer().is_err());
134    }
135
136    #[test]
137    fn endpoint_creation_fails_for_missing_schema() {
138        let result = ValidatorComponent::new()
139            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
140        assert!(result.is_err());
141    }
142
143    #[tokio::test]
144    async fn valid_json_body_passes_through() {
145        let f = json_schema_file();
146        let uri = format!("validator:{}", f.path().display());
147        let ep = ValidatorComponent::new()
148            .create_endpoint(&uri, &NoOpComponentContext)
149            .unwrap();
150        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
151        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
152            serde_json::json!({"id": "1"}),
153        )));
154        let result = producer.oneshot(exchange).await;
155        assert!(result.is_ok());
156    }
157
158    #[tokio::test]
159    async fn invalid_json_body_returns_err() {
160        let f = json_schema_file();
161        let uri = format!("validator:{}", f.path().display());
162        let ep = ValidatorComponent::new()
163            .create_endpoint(&uri, &NoOpComponentContext)
164            .unwrap();
165        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
166        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
167            serde_json::json!({"name": "x"}),
168        )));
169        let result = producer.oneshot(exchange).await;
170        assert!(result.is_err());
171        let msg = result.unwrap_err().to_string();
172        assert!(msg.contains("validation failed"), "got: {msg}");
173    }
174
175    #[tokio::test]
176    async fn valid_xml_body_passes() {
177        let f = xsd_file();
178        let uri = format!("validator:{}", f.path().display());
179        let ep = ValidatorComponent::new()
180            .create_endpoint(&uri, &NoOpComponentContext)
181            .unwrap();
182        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
183        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
184            "<order>hello</order>".to_string(),
185        )));
186        assert!(producer.oneshot(exchange).await.is_ok());
187    }
188}