Skip to main content

camel_component_validator/
component.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15};
16
17pub struct ValidatorComponent {
18    xsd_bridge: Arc<dyn XsdBridge>,
19}
20
21impl ValidatorComponent {
22    pub fn new() -> Self {
23        Self {
24            xsd_bridge: Arc::new(XsdBridgeBackend::new()),
25        }
26    }
27
28    #[cfg(test)]
29    fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
30        Self { xsd_bridge }
31    }
32}
33
34impl Default for ValidatorComponent {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl Component for ValidatorComponent {
41    fn scheme(&self) -> &str {
42        "validator"
43    }
44
45    fn create_endpoint(
46        &self,
47        uri: &str,
48        _ctx: &dyn ComponentContext,
49    ) -> Result<Box<dyn Endpoint>, CamelError> {
50        let config = ValidatorConfig::from_uri(uri)?;
51        let xsd_bridge = Arc::clone(&self.xsd_bridge);
52        let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
53        Ok(Box::new(ValidatorEndpoint {
54            uri: uri.to_string(),
55            compiled: Arc::new(compiled),
56        }))
57    }
58}
59
60struct ValidatorEndpoint {
61    uri: String,
62    compiled: Arc<CompiledValidator>,
63}
64
65impl Endpoint for ValidatorEndpoint {
66    fn uri(&self) -> &str {
67        &self.uri
68    }
69
70    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
71        Err(CamelError::EndpointCreationFailed(
72            "validator endpoint does not support consumers".to_string(),
73        ))
74    }
75
76    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
77        Ok(BoxProcessor::new(ValidatorProducer {
78            uri: self.uri.clone(),
79            compiled: Arc::clone(&self.compiled),
80        }))
81    }
82}
83
84#[derive(Clone)]
85struct ValidatorProducer {
86    uri: String,
87    compiled: Arc<CompiledValidator>,
88}
89
90impl Service<Exchange> for ValidatorProducer {
91    type Response = Exchange;
92    type Error = CamelError;
93    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
94
95    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96        Poll::Ready(Ok(()))
97    }
98
99    fn call(&mut self, exchange: Exchange) -> Self::Future {
100        let compiled = Arc::clone(&self.compiled);
101        let uri = self.uri.clone();
102        Box::pin(async move {
103            debug!(uri = uri, "validating exchange body");
104            compiled.validate(&exchange.input.body).await?;
105            Ok(exchange)
106        })
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113    use crate::error::ValidatorError;
114    use async_trait::async_trait;
115    use camel_component_api::{Message, NoOpComponentContext};
116    use std::io::Write;
117    use std::sync::atomic::{AtomicUsize, Ordering};
118    use tower::ServiceExt;
119
120    fn json_schema_file() -> tempfile::NamedTempFile {
121        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
122        f.write_all(br#"{"type":"object","required":["id"]}"#)
123            .unwrap();
124        f
125    }
126
127    fn xsd_file() -> tempfile::NamedTempFile {
128        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
129        f.write_all(
130            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
131        ).unwrap();
132        f
133    }
134
135    #[derive(Debug)]
136    struct MockXsdBridge {
137        register_calls: AtomicUsize,
138        validate_calls: AtomicUsize,
139        register_error: Option<ValidatorError>,
140        validate_error: Option<ValidatorError>,
141    }
142
143    #[async_trait]
144    impl XsdBridge for MockXsdBridge {
145        async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
146            self.register_calls.fetch_add(1, Ordering::SeqCst);
147            if let Some(err) = &self.register_error {
148                return Err(err.clone());
149            }
150            Ok("xsd-mock-id".to_string())
151        }
152
153        async fn validate(
154            &self,
155            _schema_id: &str,
156            _doc_bytes: Vec<u8>,
157        ) -> Result<(), ValidatorError> {
158            self.validate_calls.fetch_add(1, Ordering::SeqCst);
159            if let Some(err) = &self.validate_error {
160                return Err(err.clone());
161            }
162            Ok(())
163        }
164    }
165
166    #[test]
167    fn scheme_is_validator() {
168        assert_eq!(ValidatorComponent::new().scheme(), "validator");
169    }
170
171    #[test]
172    fn consumer_not_supported() {
173        let f = json_schema_file();
174        let uri = format!("validator:{}", f.path().display());
175        let ep = ValidatorComponent::new()
176            .create_endpoint(&uri, &NoOpComponentContext)
177            .unwrap();
178        assert!(ep.create_consumer().is_err());
179    }
180
181    #[test]
182    fn endpoint_creation_fails_for_missing_schema() {
183        let result = ValidatorComponent::new()
184            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
185        assert!(result.is_err());
186    }
187
188    #[tokio::test]
189    async fn valid_json_body_passes_through() {
190        let f = json_schema_file();
191        let uri = format!("validator:{}", f.path().display());
192        let ep = ValidatorComponent::new()
193            .create_endpoint(&uri, &NoOpComponentContext)
194            .unwrap();
195        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
196        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
197            serde_json::json!({"id": "1"}),
198        )));
199        let result = producer.oneshot(exchange).await;
200        assert!(result.is_ok());
201    }
202
203    #[tokio::test]
204    async fn invalid_json_body_returns_err() {
205        let f = json_schema_file();
206        let uri = format!("validator:{}", f.path().display());
207        let ep = ValidatorComponent::new()
208            .create_endpoint(&uri, &NoOpComponentContext)
209            .unwrap();
210        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
211        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
212            serde_json::json!({"name": "x"}),
213        )));
214        let result = producer.oneshot(exchange).await;
215        assert!(result.is_err());
216        let msg = result.unwrap_err().to_string();
217        assert!(msg.contains("validation failed"), "got: {msg}");
218    }
219
220    #[tokio::test]
221    async fn valid_xml_body_passes() {
222        let backend = Arc::new(MockXsdBridge {
223            register_calls: AtomicUsize::new(0),
224            validate_calls: AtomicUsize::new(0),
225            register_error: None,
226            validate_error: None,
227        });
228        let f = xsd_file();
229        let uri = format!("validator:{}", f.path().display());
230        let ep = ValidatorComponent::with_xsd_bridge(backend)
231            .create_endpoint(&uri, &NoOpComponentContext)
232            .unwrap();
233        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
234        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
235            "<order>hello</order>".to_string(),
236        )));
237        assert!(producer.oneshot(exchange).await.is_ok());
238    }
239
240    #[tokio::test]
241    async fn xsd_bridge_register_and_validate_mock() {
242        let backend = Arc::new(MockXsdBridge {
243            register_calls: AtomicUsize::new(0),
244            validate_calls: AtomicUsize::new(0),
245            register_error: None,
246            validate_error: None,
247        });
248
249        let f = xsd_file();
250        let uri = format!("validator:{}", f.path().display());
251        let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
252            .create_endpoint(&uri, &NoOpComponentContext)
253            .unwrap();
254
255        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
256        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
257            "<order>ok</order>".to_string(),
258        )));
259        assert!(producer.oneshot(exchange).await.is_ok());
260        assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
261        assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
262    }
263
264    #[tokio::test]
265    async fn xsd_bridge_register_error_propagates_on_validate() {
266        let backend = Arc::new(MockXsdBridge {
267            register_calls: AtomicUsize::new(0),
268            validate_calls: AtomicUsize::new(0),
269            register_error: Some(ValidatorError::CompilationFailed(
270                "COMPILATION_FAILED".to_string(),
271            )),
272            validate_error: None,
273        });
274        let f = xsd_file();
275        let uri = format!("validator:{}", f.path().display());
276        // Endpoint creation now always succeeds for XSD (registration is deferred).
277        let ep = ValidatorComponent::with_xsd_bridge(backend)
278            .create_endpoint(&uri, &NoOpComponentContext)
279            .expect("endpoint creation should succeed");
280        // The error surfaces when the first message is processed (register is called).
281        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
282        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
283            "<order/>".to_string(),
284        )));
285        let err = producer
286            .oneshot(exchange)
287            .await
288            .expect_err("expected validate to fail due to registration error");
289        assert!(err.to_string().contains("COMPILATION_FAILED"));
290    }
291}