Skip to main content

camel_component_validator/
component.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15    RuntimeObservability,
16};
17
18pub struct ValidatorComponent {
19    xsd_bridge: Arc<dyn XsdBridge>,
20    xsd_backend: Option<Arc<XsdBridgeBackend>>,
21}
22
23impl ValidatorComponent {
24    pub fn new() -> Self {
25        let xsd_backend = Arc::new(XsdBridgeBackend::new());
26        Self {
27            xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
28            xsd_backend: Some(xsd_backend),
29        }
30    }
31
32    pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
33        self.xsd_backend.as_ref().map(Arc::clone)
34    }
35
36    #[cfg(test)]
37    fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
38        Self {
39            xsd_bridge,
40            xsd_backend: None,
41        }
42    }
43}
44
45impl Default for ValidatorComponent {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl Component for ValidatorComponent {
52    fn scheme(&self) -> &str {
53        "validator"
54    }
55
56    fn create_endpoint(
57        &self,
58        uri: &str,
59        _ctx: &dyn ComponentContext,
60    ) -> Result<Box<dyn Endpoint>, CamelError> {
61        let config = ValidatorConfig::from_uri(uri)?;
62        let xsd_bridge = Arc::clone(&self.xsd_bridge);
63        let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
64        Ok(Box::new(ValidatorEndpoint {
65            uri: uri.to_string(),
66            config,
67            compiled: Arc::new(compiled),
68            xsd_backend: self.xsd_backend.as_ref().map(Arc::clone),
69        }))
70    }
71}
72
73/// Validator endpoint that checks message bodies or headers against a schema.
74///
75/// Supports XML (XSD), JSON Schema, and YAML schema validation.
76/// RelaxNG and Schematron are accepted in URI parsing but rejected at endpoint
77/// creation with a clear error message.
78struct ValidatorEndpoint {
79    uri: String,
80    config: ValidatorConfig,
81    compiled: Arc<CompiledValidator>,
82    xsd_backend: Option<Arc<XsdBridgeBackend>>,
83}
84
85impl ValidatorEndpoint {
86    /// Returns a human-readable description of the configured schema.
87    #[allow(dead_code)]
88    pub fn schema_info(&self) -> String {
89        format!(
90            "{:?} schema: {}",
91            self.config.schema_type,
92            self.config.schema_path.display()
93        )
94    }
95}
96
97impl Endpoint for ValidatorEndpoint {
98    fn uri(&self) -> &str {
99        &self.uri
100    }
101
102    fn create_consumer(
103        &self,
104        _rt: Arc<dyn RuntimeObservability>,
105    ) -> Result<Box<dyn Consumer>, CamelError> {
106        Err(CamelError::EndpointCreationFailed(
107            "validator endpoint does not support consumers".to_string(),
108        ))
109    }
110
111    fn create_producer(
112        &self,
113        rt: Arc<dyn RuntimeObservability>,
114        ctx: &ProducerContext,
115    ) -> Result<BoxProcessor, CamelError> {
116        if let Some(ref backend) = self.xsd_backend {
117            backend.set_observability(
118                rt.clone(),
119                ctx.route_id().unwrap_or("validator-bridge").to_string(),
120            );
121        }
122        Ok(BoxProcessor::new(ValidatorProducer {
123            uri: self.uri.clone(),
124            config: self.config.clone(),
125            compiled: Arc::clone(&self.compiled),
126        }))
127    }
128}
129
130#[derive(Clone)]
131struct ValidatorProducer {
132    uri: String,
133    config: ValidatorConfig,
134    compiled: Arc<CompiledValidator>,
135}
136
137impl Service<Exchange> for ValidatorProducer {
138    type Response = Exchange;
139    type Error = CamelError;
140    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
141
142    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143        Poll::Ready(Ok(()))
144    }
145
146    fn call(&mut self, exchange: Exchange) -> Self::Future {
147        let compiled = Arc::clone(&self.compiled);
148        let uri = self.uri.clone();
149        let config = self.config.clone();
150        Box::pin(async move {
151            debug!(uri = uri, "validating exchange body");
152
153            // VAL-003: header_name mode — validate header value instead of body
154            if let Some(ref header_name) = config.header_name {
155                match exchange.input.header(header_name) {
156                    Some(value) => {
157                        // Convert header Value to a string body for validation
158                        let header_str = match value.as_str() {
159                            Some(s) => s.to_string(),
160                            None => value.to_string(),
161                        };
162                        let header_body = camel_component_api::Body::Text(header_str);
163                        compiled.validate(&header_body).await?;
164                    }
165                    None => {
166                        // VAL-004: failOnNullHeader
167                        if config.fail_on_null_header {
168                            return Err(CamelError::ProcessorError(format!(
169                                "header '{header_name}' is missing and failOnNullHeader is true"
170                            )));
171                        }
172                        // Pass through — no validation
173                    }
174                }
175                return Ok(exchange);
176            }
177
178            // VAL-002: failOnNullBody
179            if exchange.input.body.is_empty() {
180                if config.fail_on_null_body {
181                    return Err(CamelError::ProcessorError(
182                        "body is empty and failOnNullBody is true".to_string(),
183                    ));
184                }
185                // Pass through — no validation
186                return Ok(exchange);
187            }
188
189            compiled.validate(&exchange.input.body).await?;
190            Ok(exchange)
191        })
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use camel_component_api::test_support::PanicRuntimeObservability;
198    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
199        std::sync::Arc::new(PanicRuntimeObservability)
200    }
201
202    use super::*;
203    use crate::error::ValidatorError;
204    use async_trait::async_trait;
205    use camel_component_api::{Message, NoOpComponentContext};
206    use std::io::Write;
207    use std::sync::atomic::{AtomicUsize, Ordering};
208    use tower::ServiceExt;
209
210    fn json_schema_file() -> tempfile::NamedTempFile {
211        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
212        f.write_all(br#"{"type":"object","required":["id"]}"#)
213            .unwrap();
214        f
215    }
216
217    fn xsd_file() -> tempfile::NamedTempFile {
218        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
219        f.write_all(
220            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
221        ).unwrap();
222        f
223    }
224
225    #[derive(Debug)]
226    struct MockXsdBridge {
227        register_calls: AtomicUsize,
228        validate_calls: AtomicUsize,
229        register_error: Option<ValidatorError>,
230        validate_error: Option<ValidatorError>,
231    }
232
233    #[async_trait]
234    impl XsdBridge for MockXsdBridge {
235        async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
236            self.register_calls.fetch_add(1, Ordering::SeqCst);
237            if let Some(err) = &self.register_error {
238                return Err(err.clone());
239            }
240            Ok("xsd-mock-id".to_string())
241        }
242
243        async fn validate(
244            &self,
245            _schema_id: &str,
246            _doc_bytes: Vec<u8>,
247        ) -> Result<(), ValidatorError> {
248            self.validate_calls.fetch_add(1, Ordering::SeqCst);
249            if let Some(err) = &self.validate_error {
250                return Err(err.clone());
251            }
252            Ok(())
253        }
254    }
255
256    #[test]
257    fn scheme_is_validator() {
258        assert_eq!(ValidatorComponent::new().scheme(), "validator");
259    }
260
261    #[test]
262    fn consumer_not_supported() {
263        let f = json_schema_file();
264        let uri = format!("validator:{}", f.path().display());
265        let ep = ValidatorComponent::new()
266            .create_endpoint(&uri, &NoOpComponentContext)
267            .unwrap();
268        assert!(ep.create_consumer(rt()).is_err());
269    }
270
271    #[test]
272    fn endpoint_creation_fails_for_missing_schema() {
273        let result = ValidatorComponent::new()
274            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
275        assert!(result.is_err());
276    }
277
278    #[tokio::test]
279    async fn valid_json_body_passes_through() {
280        let f = json_schema_file();
281        let uri = format!("validator:{}", f.path().display());
282        let ep = ValidatorComponent::new()
283            .create_endpoint(&uri, &NoOpComponentContext)
284            .unwrap();
285        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
286        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
287            serde_json::json!({"id": "1"}),
288        )));
289        let result = producer.oneshot(exchange).await;
290        assert!(result.is_ok());
291    }
292
293    #[tokio::test]
294    async fn invalid_json_body_returns_err() {
295        let f = json_schema_file();
296        let uri = format!("validator:{}", f.path().display());
297        let ep = ValidatorComponent::new()
298            .create_endpoint(&uri, &NoOpComponentContext)
299            .unwrap();
300        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
301        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
302            serde_json::json!({"name": "x"}),
303        )));
304        let result = producer.oneshot(exchange).await;
305        assert!(result.is_err());
306        let msg = result.unwrap_err().to_string();
307        assert!(msg.contains("validation failed"), "got: {msg}");
308    }
309
310    #[tokio::test]
311    async fn valid_xml_body_passes() {
312        let backend = Arc::new(MockXsdBridge {
313            register_calls: AtomicUsize::new(0),
314            validate_calls: AtomicUsize::new(0),
315            register_error: None,
316            validate_error: None,
317        });
318        let f = xsd_file();
319        let uri = format!("validator:{}", f.path().display());
320        let ep = ValidatorComponent::with_xsd_bridge(backend)
321            .create_endpoint(&uri, &NoOpComponentContext)
322            .unwrap();
323        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
324        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
325            "<order>hello</order>".to_string(),
326        )));
327        assert!(producer.oneshot(exchange).await.is_ok());
328    }
329
330    #[tokio::test]
331    async fn xsd_bridge_register_and_validate_mock() {
332        let backend = Arc::new(MockXsdBridge {
333            register_calls: AtomicUsize::new(0),
334            validate_calls: AtomicUsize::new(0),
335            register_error: None,
336            validate_error: None,
337        });
338
339        let f = xsd_file();
340        let uri = format!("validator:{}", f.path().display());
341        let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
342            .create_endpoint(&uri, &NoOpComponentContext)
343            .unwrap();
344
345        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
346        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
347            "<order>ok</order>".to_string(),
348        )));
349        assert!(producer.oneshot(exchange).await.is_ok());
350        assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
351        assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
352    }
353
354    #[tokio::test]
355    async fn xsd_bridge_register_error_propagates_on_validate() {
356        let backend = Arc::new(MockXsdBridge {
357            register_calls: AtomicUsize::new(0),
358            validate_calls: AtomicUsize::new(0),
359            register_error: Some(ValidatorError::CompilationFailed {
360                message: "COMPILATION_FAILED".to_string(),
361                source: None,
362            }),
363            validate_error: None,
364        });
365        let f = xsd_file();
366        let uri = format!("validator:{}", f.path().display());
367        // Endpoint creation now always succeeds for XSD (registration is deferred).
368        let ep = ValidatorComponent::with_xsd_bridge(backend)
369            .create_endpoint(&uri, &NoOpComponentContext)
370            .expect("endpoint creation should succeed");
371        // The error surfaces when the first message is processed (register is called).
372        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
373        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
374            "<order/>".to_string(),
375        )));
376        let err = producer
377            .oneshot(exchange)
378            .await
379            .expect_err("expected validate to fail due to registration error");
380        assert!(err.to_string().contains("COMPILATION_FAILED"));
381    }
382
383    #[tokio::test]
384    async fn test_validator_rejects_oversized_payload() {
385        // Build validator with maxPayloadBytes=100
386        let f = json_schema_file();
387        let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
388        let ep = ValidatorComponent::new()
389            .create_endpoint(&uri, &NoOpComponentContext)
390            .unwrap();
391        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
392        // Send a body that is definitely > 100 bytes
393        let big_body: String = "x".repeat(200);
394        let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
395        let result = producer.oneshot(exchange).await;
396        assert!(result.is_err(), "expected oversized payload to be rejected");
397        let msg = result.unwrap_err().to_string();
398        assert!(
399            msg.contains("payload too large"),
400            "expected 'payload too large' in error, got: {msg}"
401        );
402    }
403
404    #[tokio::test]
405    async fn test_validator_allows_payload_under_limit() {
406        let f = json_schema_file();
407        let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
408        let ep = ValidatorComponent::new()
409            .create_endpoint(&uri, &NoOpComponentContext)
410            .unwrap();
411        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
412        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
413            serde_json::json!({"id": "1"}),
414        )));
415        let result = producer.oneshot(exchange).await;
416        assert!(result.is_ok(), "expected valid payload under limit to pass");
417    }
418
419    #[tokio::test]
420    async fn test_validator_no_limit_allows_any_size() {
421        let f = json_schema_file();
422        let uri = format!("validator:{}", f.path().display());
423        let ep = ValidatorComponent::new()
424            .create_endpoint(&uri, &NoOpComponentContext)
425            .unwrap();
426        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
427        // Valid JSON that would exceed a 10-byte limit
428        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
429            serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
430        )));
431        let result = producer.oneshot(exchange).await;
432        assert!(
433            result.is_ok(),
434            "expected no-limit validator to pass any valid payload"
435        );
436    }
437
438    #[tokio::test]
439    async fn test_fail_on_null_body_default_rejects_empty() {
440        let f = json_schema_file();
441        let uri = format!("validator:{}", f.path().display());
442        let ep = ValidatorComponent::new()
443            .create_endpoint(&uri, &NoOpComponentContext)
444            .unwrap();
445        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
446        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
447        let result = producer.oneshot(exchange).await;
448        assert!(result.is_err());
449        let msg = result.unwrap_err().to_string();
450        assert!(
451            msg.contains("failOnNullBody"),
452            "expected failOnNullBody in error, got: {msg}"
453        );
454    }
455
456    #[tokio::test]
457    async fn test_fail_on_null_body_false_passes_empty() {
458        let f = json_schema_file();
459        let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
460        let ep = ValidatorComponent::new()
461            .create_endpoint(&uri, &NoOpComponentContext)
462            .unwrap();
463        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
464        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
465        let result = producer.oneshot(exchange).await;
466        assert!(
467            result.is_ok(),
468            "expected empty body to pass with failOnNullBody=false"
469        );
470    }
471
472    #[tokio::test]
473    async fn test_header_name_validation_uses_header_value() {
474        let f = json_schema_file();
475        let uri = format!("validator:{}?headerName=X-Data", f.path().display());
476        let ep = ValidatorComponent::new()
477            .create_endpoint(&uri, &NoOpComponentContext)
478            .unwrap();
479        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
480        let mut msg = Message::new(camel_component_api::Body::Empty);
481        msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
482        let exchange = Exchange::new(msg);
483        let result = producer.oneshot(exchange).await;
484        // Header value {"id":"1"} is valid JSON matching schema
485        assert!(
486            result.is_ok(),
487            "expected valid header to pass: {:?}",
488            result
489        );
490    }
491
492    #[tokio::test]
493    async fn test_header_name_missing_header_fails_by_default() {
494        let f = json_schema_file();
495        let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
496        let ep = ValidatorComponent::new()
497            .create_endpoint(&uri, &NoOpComponentContext)
498            .unwrap();
499        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
500        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
501        let result = producer.oneshot(exchange).await;
502        assert!(result.is_err());
503        let msg = result.unwrap_err().to_string();
504        assert!(
505            msg.contains("X-Missing"),
506            "expected header name in error, got: {msg}"
507        );
508    }
509
510    #[tokio::test]
511    async fn test_fail_on_null_header_false_passes_missing_header() {
512        let f = json_schema_file();
513        let uri = format!(
514            "validator:{}?headerName=X-Missing&failOnNullHeader=false",
515            f.path().display()
516        );
517        let ep = ValidatorComponent::new()
518            .create_endpoint(&uri, &NoOpComponentContext)
519            .unwrap();
520        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
521        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
522        let result = producer.oneshot(exchange).await;
523        assert!(
524            result.is_ok(),
525            "expected missing header to pass with failOnNullHeader=false"
526        );
527    }
528
529    #[test]
530    fn test_relaxng_schema_type_rejected_at_creation() {
531        let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
532        use std::io::Write;
533        f.write_all(b"<grammar/>").unwrap();
534        let uri = format!("validator:{}", f.path().display());
535        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
536        let err = result.err().expect("expected endpoint creation to fail");
537        let msg = err.to_string();
538        assert!(
539            msg.contains("not yet supported"),
540            "expected 'not yet supported' in error, got: {msg}"
541        );
542    }
543
544    #[test]
545    fn test_schematron_schema_type_rejected_at_creation() {
546        let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
547        use std::io::Write;
548        f.write_all(b"<schema/>").unwrap();
549        let uri = format!("validator:{}", f.path().display());
550        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
551        let err = result.err().expect("expected endpoint creation to fail");
552        let msg = err.to_string();
553        assert!(
554            msg.contains("not yet supported"),
555            "expected 'not yet supported' in error, got: {msg}"
556        );
557    }
558
559    #[test]
560    fn test_schema_info_returns_description() {
561        let f = json_schema_file();
562        let uri = format!("validator:{}", f.path().display());
563        let ep = ValidatorComponent::new()
564            .create_endpoint(&uri, &NoOpComponentContext)
565            .unwrap();
566        // The endpoint is a Box<dyn Endpoint>, so we can't directly call schema_info().
567        // We verify endpoint creation succeeds (schema compiled).
568        assert_eq!(ep.uri(), uri);
569    }
570}