Skip to main content

camel_component_validator/
component.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15    RuntimeObservability,
16};
17
18pub struct ValidatorComponent {
19    xsd_bridge: Arc<dyn XsdBridge>,
20    xsd_backend: Option<Arc<XsdBridgeBackend>>,
21}
22
23impl ValidatorComponent {
24    pub fn new() -> Self {
25        let xsd_backend = Arc::new(XsdBridgeBackend::new());
26        Self {
27            xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
28            xsd_backend: Some(xsd_backend),
29        }
30    }
31
32    pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
33        self.xsd_backend.as_ref().map(Arc::clone)
34    }
35
36    #[cfg(test)]
37    fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
38        Self {
39            xsd_bridge,
40            xsd_backend: None,
41        }
42    }
43}
44
45impl Default for ValidatorComponent {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl Component for ValidatorComponent {
52    fn scheme(&self) -> &str {
53        "validator"
54    }
55
56    fn create_endpoint(
57        &self,
58        uri: &str,
59        _ctx: &dyn ComponentContext,
60    ) -> Result<Box<dyn Endpoint>, CamelError> {
61        let config = ValidatorConfig::from_uri(uri)?;
62        let xsd_bridge = Arc::clone(&self.xsd_bridge);
63        let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
64        Ok(Box::new(ValidatorEndpoint {
65            uri: uri.to_string(),
66            config,
67            compiled: Arc::new(compiled),
68        }))
69    }
70}
71
72/// Validator endpoint that checks message bodies or headers against a schema.
73///
74/// Supports XML (XSD), JSON Schema, and YAML schema validation.
75/// RelaxNG and Schematron are accepted in URI parsing but rejected at endpoint
76/// creation with a clear error message.
77struct ValidatorEndpoint {
78    uri: String,
79    config: ValidatorConfig,
80    compiled: Arc<CompiledValidator>,
81}
82
83impl ValidatorEndpoint {
84    /// Returns a human-readable description of the configured schema.
85    #[allow(dead_code)]
86    pub fn schema_info(&self) -> String {
87        format!(
88            "{:?} schema: {}",
89            self.config.schema_type,
90            self.config.schema_path.display()
91        )
92    }
93}
94
95impl Endpoint for ValidatorEndpoint {
96    fn uri(&self) -> &str {
97        &self.uri
98    }
99
100    fn create_consumer(
101        &self,
102        _rt: Arc<dyn RuntimeObservability>,
103    ) -> Result<Box<dyn Consumer>, CamelError> {
104        Err(CamelError::EndpointCreationFailed(
105            "validator endpoint does not support consumers".to_string(),
106        ))
107    }
108
109    fn create_producer(
110        &self,
111        _rt: Arc<dyn RuntimeObservability>,
112        _ctx: &ProducerContext,
113    ) -> Result<BoxProcessor, CamelError> {
114        Ok(BoxProcessor::new(ValidatorProducer {
115            uri: self.uri.clone(),
116            config: self.config.clone(),
117            compiled: Arc::clone(&self.compiled),
118        }))
119    }
120}
121
122#[derive(Clone)]
123struct ValidatorProducer {
124    uri: String,
125    config: ValidatorConfig,
126    compiled: Arc<CompiledValidator>,
127}
128
129impl Service<Exchange> for ValidatorProducer {
130    type Response = Exchange;
131    type Error = CamelError;
132    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
133
134    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135        Poll::Ready(Ok(()))
136    }
137
138    fn call(&mut self, exchange: Exchange) -> Self::Future {
139        let compiled = Arc::clone(&self.compiled);
140        let uri = self.uri.clone();
141        let config = self.config.clone();
142        Box::pin(async move {
143            debug!(uri = uri, "validating exchange body");
144
145            // VAL-003: header_name mode — validate header value instead of body
146            if let Some(ref header_name) = config.header_name {
147                match exchange.input.header(header_name) {
148                    Some(value) => {
149                        // Convert header Value to a string body for validation
150                        let header_str = match value.as_str() {
151                            Some(s) => s.to_string(),
152                            None => value.to_string(),
153                        };
154                        let header_body = camel_component_api::Body::Text(header_str);
155                        compiled.validate(&header_body).await?;
156                    }
157                    None => {
158                        // VAL-004: failOnNullHeader
159                        if config.fail_on_null_header {
160                            return Err(CamelError::ProcessorError(format!(
161                                "header '{header_name}' is missing and failOnNullHeader is true"
162                            )));
163                        }
164                        // Pass through — no validation
165                    }
166                }
167                return Ok(exchange);
168            }
169
170            // VAL-002: failOnNullBody
171            if exchange.input.body.is_empty() {
172                if config.fail_on_null_body {
173                    return Err(CamelError::ProcessorError(
174                        "body is empty and failOnNullBody is true".to_string(),
175                    ));
176                }
177                // Pass through — no validation
178                return Ok(exchange);
179            }
180
181            compiled.validate(&exchange.input.body).await?;
182            Ok(exchange)
183        })
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use camel_component_api::test_support::PanicRuntimeObservability;
190    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
191        std::sync::Arc::new(PanicRuntimeObservability)
192    }
193
194    use super::*;
195    use crate::error::ValidatorError;
196    use async_trait::async_trait;
197    use camel_component_api::{Message, NoOpComponentContext};
198    use std::io::Write;
199    use std::sync::atomic::{AtomicUsize, Ordering};
200    use tower::ServiceExt;
201
202    fn json_schema_file() -> tempfile::NamedTempFile {
203        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
204        f.write_all(br#"{"type":"object","required":["id"]}"#)
205            .unwrap();
206        f
207    }
208
209    fn xsd_file() -> tempfile::NamedTempFile {
210        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
211        f.write_all(
212            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
213        ).unwrap();
214        f
215    }
216
217    #[derive(Debug)]
218    struct MockXsdBridge {
219        register_calls: AtomicUsize,
220        validate_calls: AtomicUsize,
221        register_error: Option<ValidatorError>,
222        validate_error: Option<ValidatorError>,
223    }
224
225    #[async_trait]
226    impl XsdBridge for MockXsdBridge {
227        async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
228            self.register_calls.fetch_add(1, Ordering::SeqCst);
229            if let Some(err) = &self.register_error {
230                return Err(err.clone());
231            }
232            Ok("xsd-mock-id".to_string())
233        }
234
235        async fn validate(
236            &self,
237            _schema_id: &str,
238            _doc_bytes: Vec<u8>,
239        ) -> Result<(), ValidatorError> {
240            self.validate_calls.fetch_add(1, Ordering::SeqCst);
241            if let Some(err) = &self.validate_error {
242                return Err(err.clone());
243            }
244            Ok(())
245        }
246    }
247
248    #[test]
249    fn scheme_is_validator() {
250        assert_eq!(ValidatorComponent::new().scheme(), "validator");
251    }
252
253    #[test]
254    fn consumer_not_supported() {
255        let f = json_schema_file();
256        let uri = format!("validator:{}", f.path().display());
257        let ep = ValidatorComponent::new()
258            .create_endpoint(&uri, &NoOpComponentContext)
259            .unwrap();
260        assert!(ep.create_consumer(rt()).is_err());
261    }
262
263    #[test]
264    fn endpoint_creation_fails_for_missing_schema() {
265        let result = ValidatorComponent::new()
266            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
267        assert!(result.is_err());
268    }
269
270    #[tokio::test]
271    async fn valid_json_body_passes_through() {
272        let f = json_schema_file();
273        let uri = format!("validator:{}", f.path().display());
274        let ep = ValidatorComponent::new()
275            .create_endpoint(&uri, &NoOpComponentContext)
276            .unwrap();
277        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
278        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
279            serde_json::json!({"id": "1"}),
280        )));
281        let result = producer.oneshot(exchange).await;
282        assert!(result.is_ok());
283    }
284
285    #[tokio::test]
286    async fn invalid_json_body_returns_err() {
287        let f = json_schema_file();
288        let uri = format!("validator:{}", f.path().display());
289        let ep = ValidatorComponent::new()
290            .create_endpoint(&uri, &NoOpComponentContext)
291            .unwrap();
292        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
293        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
294            serde_json::json!({"name": "x"}),
295        )));
296        let result = producer.oneshot(exchange).await;
297        assert!(result.is_err());
298        let msg = result.unwrap_err().to_string();
299        assert!(msg.contains("validation failed"), "got: {msg}");
300    }
301
302    #[tokio::test]
303    async fn valid_xml_body_passes() {
304        let backend = Arc::new(MockXsdBridge {
305            register_calls: AtomicUsize::new(0),
306            validate_calls: AtomicUsize::new(0),
307            register_error: None,
308            validate_error: None,
309        });
310        let f = xsd_file();
311        let uri = format!("validator:{}", f.path().display());
312        let ep = ValidatorComponent::with_xsd_bridge(backend)
313            .create_endpoint(&uri, &NoOpComponentContext)
314            .unwrap();
315        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
316        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
317            "<order>hello</order>".to_string(),
318        )));
319        assert!(producer.oneshot(exchange).await.is_ok());
320    }
321
322    #[tokio::test]
323    async fn xsd_bridge_register_and_validate_mock() {
324        let backend = Arc::new(MockXsdBridge {
325            register_calls: AtomicUsize::new(0),
326            validate_calls: AtomicUsize::new(0),
327            register_error: None,
328            validate_error: None,
329        });
330
331        let f = xsd_file();
332        let uri = format!("validator:{}", f.path().display());
333        let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
334            .create_endpoint(&uri, &NoOpComponentContext)
335            .unwrap();
336
337        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
338        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
339            "<order>ok</order>".to_string(),
340        )));
341        assert!(producer.oneshot(exchange).await.is_ok());
342        assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
343        assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
344    }
345
346    #[tokio::test]
347    async fn xsd_bridge_register_error_propagates_on_validate() {
348        let backend = Arc::new(MockXsdBridge {
349            register_calls: AtomicUsize::new(0),
350            validate_calls: AtomicUsize::new(0),
351            register_error: Some(ValidatorError::CompilationFailed {
352                message: "COMPILATION_FAILED".to_string(),
353                source: None,
354            }),
355            validate_error: None,
356        });
357        let f = xsd_file();
358        let uri = format!("validator:{}", f.path().display());
359        // Endpoint creation now always succeeds for XSD (registration is deferred).
360        let ep = ValidatorComponent::with_xsd_bridge(backend)
361            .create_endpoint(&uri, &NoOpComponentContext)
362            .expect("endpoint creation should succeed");
363        // The error surfaces when the first message is processed (register is called).
364        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
365        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
366            "<order/>".to_string(),
367        )));
368        let err = producer
369            .oneshot(exchange)
370            .await
371            .expect_err("expected validate to fail due to registration error");
372        assert!(err.to_string().contains("COMPILATION_FAILED"));
373    }
374
375    #[tokio::test]
376    async fn test_validator_rejects_oversized_payload() {
377        // Build validator with maxPayloadBytes=100
378        let f = json_schema_file();
379        let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
380        let ep = ValidatorComponent::new()
381            .create_endpoint(&uri, &NoOpComponentContext)
382            .unwrap();
383        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
384        // Send a body that is definitely > 100 bytes
385        let big_body: String = "x".repeat(200);
386        let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
387        let result = producer.oneshot(exchange).await;
388        assert!(result.is_err(), "expected oversized payload to be rejected");
389        let msg = result.unwrap_err().to_string();
390        assert!(
391            msg.contains("payload too large"),
392            "expected 'payload too large' in error, got: {msg}"
393        );
394    }
395
396    #[tokio::test]
397    async fn test_validator_allows_payload_under_limit() {
398        let f = json_schema_file();
399        let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
400        let ep = ValidatorComponent::new()
401            .create_endpoint(&uri, &NoOpComponentContext)
402            .unwrap();
403        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
404        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
405            serde_json::json!({"id": "1"}),
406        )));
407        let result = producer.oneshot(exchange).await;
408        assert!(result.is_ok(), "expected valid payload under limit to pass");
409    }
410
411    #[tokio::test]
412    async fn test_validator_no_limit_allows_any_size() {
413        let f = json_schema_file();
414        let uri = format!("validator:{}", f.path().display());
415        let ep = ValidatorComponent::new()
416            .create_endpoint(&uri, &NoOpComponentContext)
417            .unwrap();
418        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
419        // Valid JSON that would exceed a 10-byte limit
420        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
421            serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
422        )));
423        let result = producer.oneshot(exchange).await;
424        assert!(
425            result.is_ok(),
426            "expected no-limit validator to pass any valid payload"
427        );
428    }
429
430    #[tokio::test]
431    async fn test_fail_on_null_body_default_rejects_empty() {
432        let f = json_schema_file();
433        let uri = format!("validator:{}", f.path().display());
434        let ep = ValidatorComponent::new()
435            .create_endpoint(&uri, &NoOpComponentContext)
436            .unwrap();
437        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
438        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
439        let result = producer.oneshot(exchange).await;
440        assert!(result.is_err());
441        let msg = result.unwrap_err().to_string();
442        assert!(
443            msg.contains("failOnNullBody"),
444            "expected failOnNullBody in error, got: {msg}"
445        );
446    }
447
448    #[tokio::test]
449    async fn test_fail_on_null_body_false_passes_empty() {
450        let f = json_schema_file();
451        let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
452        let ep = ValidatorComponent::new()
453            .create_endpoint(&uri, &NoOpComponentContext)
454            .unwrap();
455        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
456        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
457        let result = producer.oneshot(exchange).await;
458        assert!(
459            result.is_ok(),
460            "expected empty body to pass with failOnNullBody=false"
461        );
462    }
463
464    #[tokio::test]
465    async fn test_header_name_validation_uses_header_value() {
466        let f = json_schema_file();
467        let uri = format!("validator:{}?headerName=X-Data", f.path().display());
468        let ep = ValidatorComponent::new()
469            .create_endpoint(&uri, &NoOpComponentContext)
470            .unwrap();
471        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
472        let mut msg = Message::new(camel_component_api::Body::Empty);
473        msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
474        let exchange = Exchange::new(msg);
475        let result = producer.oneshot(exchange).await;
476        // Header value {"id":"1"} is valid JSON matching schema
477        assert!(
478            result.is_ok(),
479            "expected valid header to pass: {:?}",
480            result
481        );
482    }
483
484    #[tokio::test]
485    async fn test_header_name_missing_header_fails_by_default() {
486        let f = json_schema_file();
487        let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
488        let ep = ValidatorComponent::new()
489            .create_endpoint(&uri, &NoOpComponentContext)
490            .unwrap();
491        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
492        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
493        let result = producer.oneshot(exchange).await;
494        assert!(result.is_err());
495        let msg = result.unwrap_err().to_string();
496        assert!(
497            msg.contains("X-Missing"),
498            "expected header name in error, got: {msg}"
499        );
500    }
501
502    #[tokio::test]
503    async fn test_fail_on_null_header_false_passes_missing_header() {
504        let f = json_schema_file();
505        let uri = format!(
506            "validator:{}?headerName=X-Missing&failOnNullHeader=false",
507            f.path().display()
508        );
509        let ep = ValidatorComponent::new()
510            .create_endpoint(&uri, &NoOpComponentContext)
511            .unwrap();
512        let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
513        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
514        let result = producer.oneshot(exchange).await;
515        assert!(
516            result.is_ok(),
517            "expected missing header to pass with failOnNullHeader=false"
518        );
519    }
520
521    #[test]
522    fn test_relaxng_schema_type_rejected_at_creation() {
523        let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
524        use std::io::Write;
525        f.write_all(b"<grammar/>").unwrap();
526        let uri = format!("validator:{}", f.path().display());
527        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
528        let err = result.err().expect("expected endpoint creation to fail");
529        let msg = err.to_string();
530        assert!(
531            msg.contains("not yet supported"),
532            "expected 'not yet supported' in error, got: {msg}"
533        );
534    }
535
536    #[test]
537    fn test_schematron_schema_type_rejected_at_creation() {
538        let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
539        use std::io::Write;
540        f.write_all(b"<schema/>").unwrap();
541        let uri = format!("validator:{}", f.path().display());
542        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
543        let err = result.err().expect("expected endpoint creation to fail");
544        let msg = err.to_string();
545        assert!(
546            msg.contains("not yet supported"),
547            "expected 'not yet supported' in error, got: {msg}"
548        );
549    }
550
551    #[test]
552    fn test_schema_info_returns_description() {
553        let f = json_schema_file();
554        let uri = format!("validator:{}", f.path().display());
555        let ep = ValidatorComponent::new()
556            .create_endpoint(&uri, &NoOpComponentContext)
557            .unwrap();
558        // The endpoint is a Box<dyn Endpoint>, so we can't directly call schema_info().
559        // We verify endpoint creation succeeds (schema compiled).
560        assert_eq!(ep.uri(), uri);
561    }
562}