Skip to main content

camel_component_validator/
component.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14    BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15};
16
17pub struct ValidatorComponent {
18    xsd_bridge: Arc<dyn XsdBridge>,
19    xsd_backend: Option<Arc<XsdBridgeBackend>>,
20}
21
22impl ValidatorComponent {
23    pub fn new() -> Self {
24        let xsd_backend = Arc::new(XsdBridgeBackend::new());
25        Self {
26            xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
27            xsd_backend: Some(xsd_backend),
28        }
29    }
30
31    pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
32        self.xsd_backend.as_ref().map(Arc::clone)
33    }
34
35    #[cfg(test)]
36    fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
37        Self {
38            xsd_bridge,
39            xsd_backend: None,
40        }
41    }
42}
43
44impl Default for ValidatorComponent {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl Component for ValidatorComponent {
51    fn scheme(&self) -> &str {
52        "validator"
53    }
54
55    fn create_endpoint(
56        &self,
57        uri: &str,
58        _ctx: &dyn ComponentContext,
59    ) -> Result<Box<dyn Endpoint>, CamelError> {
60        let config = ValidatorConfig::from_uri(uri)?;
61        let xsd_bridge = Arc::clone(&self.xsd_bridge);
62        let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
63        Ok(Box::new(ValidatorEndpoint {
64            uri: uri.to_string(),
65            config,
66            compiled: Arc::new(compiled),
67        }))
68    }
69}
70
71/// Validator endpoint that checks message bodies or headers against a schema.
72///
73/// Supports XML (XSD), JSON Schema, and YAML schema validation.
74/// RelaxNG and Schematron are accepted in URI parsing but rejected at endpoint
75/// creation with a clear error message.
76struct ValidatorEndpoint {
77    uri: String,
78    config: ValidatorConfig,
79    compiled: Arc<CompiledValidator>,
80}
81
82impl ValidatorEndpoint {
83    /// Returns a human-readable description of the configured schema.
84    #[allow(dead_code)]
85    pub fn schema_info(&self) -> String {
86        format!(
87            "{:?} schema: {}",
88            self.config.schema_type,
89            self.config.schema_path.display()
90        )
91    }
92}
93
94impl Endpoint for ValidatorEndpoint {
95    fn uri(&self) -> &str {
96        &self.uri
97    }
98
99    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100        Err(CamelError::EndpointCreationFailed(
101            "validator endpoint does not support consumers".to_string(),
102        ))
103    }
104
105    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
106        Ok(BoxProcessor::new(ValidatorProducer {
107            uri: self.uri.clone(),
108            config: self.config.clone(),
109            compiled: Arc::clone(&self.compiled),
110        }))
111    }
112}
113
114#[derive(Clone)]
115struct ValidatorProducer {
116    uri: String,
117    config: ValidatorConfig,
118    compiled: Arc<CompiledValidator>,
119}
120
121impl Service<Exchange> for ValidatorProducer {
122    type Response = Exchange;
123    type Error = CamelError;
124    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
125
126    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127        Poll::Ready(Ok(()))
128    }
129
130    fn call(&mut self, exchange: Exchange) -> Self::Future {
131        let compiled = Arc::clone(&self.compiled);
132        let uri = self.uri.clone();
133        let config = self.config.clone();
134        Box::pin(async move {
135            debug!(uri = uri, "validating exchange body");
136
137            // VAL-003: header_name mode — validate header value instead of body
138            if let Some(ref header_name) = config.header_name {
139                match exchange.input.header(header_name) {
140                    Some(value) => {
141                        // Convert header Value to a string body for validation
142                        let header_str = match value.as_str() {
143                            Some(s) => s.to_string(),
144                            None => value.to_string(),
145                        };
146                        let header_body = camel_component_api::Body::Text(header_str);
147                        compiled.validate(&header_body).await?;
148                    }
149                    None => {
150                        // VAL-004: failOnNullHeader
151                        if config.fail_on_null_header {
152                            return Err(CamelError::ProcessorError(format!(
153                                "header '{header_name}' is missing and failOnNullHeader is true"
154                            )));
155                        }
156                        // Pass through — no validation
157                    }
158                }
159                return Ok(exchange);
160            }
161
162            // VAL-002: failOnNullBody
163            if exchange.input.body.is_empty() {
164                if config.fail_on_null_body {
165                    return Err(CamelError::ProcessorError(
166                        "body is empty and failOnNullBody is true".to_string(),
167                    ));
168                }
169                // Pass through — no validation
170                return Ok(exchange);
171            }
172
173            compiled.validate(&exchange.input.body).await?;
174            Ok(exchange)
175        })
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::error::ValidatorError;
183    use async_trait::async_trait;
184    use camel_component_api::{Message, NoOpComponentContext};
185    use std::io::Write;
186    use std::sync::atomic::{AtomicUsize, Ordering};
187    use tower::ServiceExt;
188
189    fn json_schema_file() -> tempfile::NamedTempFile {
190        let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
191        f.write_all(br#"{"type":"object","required":["id"]}"#)
192            .unwrap();
193        f
194    }
195
196    fn xsd_file() -> tempfile::NamedTempFile {
197        let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
198        f.write_all(
199            br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
200        ).unwrap();
201        f
202    }
203
204    #[derive(Debug)]
205    struct MockXsdBridge {
206        register_calls: AtomicUsize,
207        validate_calls: AtomicUsize,
208        register_error: Option<ValidatorError>,
209        validate_error: Option<ValidatorError>,
210    }
211
212    #[async_trait]
213    impl XsdBridge for MockXsdBridge {
214        async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
215            self.register_calls.fetch_add(1, Ordering::SeqCst);
216            if let Some(err) = &self.register_error {
217                return Err(err.clone());
218            }
219            Ok("xsd-mock-id".to_string())
220        }
221
222        async fn validate(
223            &self,
224            _schema_id: &str,
225            _doc_bytes: Vec<u8>,
226        ) -> Result<(), ValidatorError> {
227            self.validate_calls.fetch_add(1, Ordering::SeqCst);
228            if let Some(err) = &self.validate_error {
229                return Err(err.clone());
230            }
231            Ok(())
232        }
233    }
234
235    #[test]
236    fn scheme_is_validator() {
237        assert_eq!(ValidatorComponent::new().scheme(), "validator");
238    }
239
240    #[test]
241    fn consumer_not_supported() {
242        let f = json_schema_file();
243        let uri = format!("validator:{}", f.path().display());
244        let ep = ValidatorComponent::new()
245            .create_endpoint(&uri, &NoOpComponentContext)
246            .unwrap();
247        assert!(ep.create_consumer().is_err());
248    }
249
250    #[test]
251    fn endpoint_creation_fails_for_missing_schema() {
252        let result = ValidatorComponent::new()
253            .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
254        assert!(result.is_err());
255    }
256
257    #[tokio::test]
258    async fn valid_json_body_passes_through() {
259        let f = json_schema_file();
260        let uri = format!("validator:{}", f.path().display());
261        let ep = ValidatorComponent::new()
262            .create_endpoint(&uri, &NoOpComponentContext)
263            .unwrap();
264        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
265        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
266            serde_json::json!({"id": "1"}),
267        )));
268        let result = producer.oneshot(exchange).await;
269        assert!(result.is_ok());
270    }
271
272    #[tokio::test]
273    async fn invalid_json_body_returns_err() {
274        let f = json_schema_file();
275        let uri = format!("validator:{}", f.path().display());
276        let ep = ValidatorComponent::new()
277            .create_endpoint(&uri, &NoOpComponentContext)
278            .unwrap();
279        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
280        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
281            serde_json::json!({"name": "x"}),
282        )));
283        let result = producer.oneshot(exchange).await;
284        assert!(result.is_err());
285        let msg = result.unwrap_err().to_string();
286        assert!(msg.contains("validation failed"), "got: {msg}");
287    }
288
289    #[tokio::test]
290    async fn valid_xml_body_passes() {
291        let backend = Arc::new(MockXsdBridge {
292            register_calls: AtomicUsize::new(0),
293            validate_calls: AtomicUsize::new(0),
294            register_error: None,
295            validate_error: None,
296        });
297        let f = xsd_file();
298        let uri = format!("validator:{}", f.path().display());
299        let ep = ValidatorComponent::with_xsd_bridge(backend)
300            .create_endpoint(&uri, &NoOpComponentContext)
301            .unwrap();
302        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
303        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
304            "<order>hello</order>".to_string(),
305        )));
306        assert!(producer.oneshot(exchange).await.is_ok());
307    }
308
309    #[tokio::test]
310    async fn xsd_bridge_register_and_validate_mock() {
311        let backend = Arc::new(MockXsdBridge {
312            register_calls: AtomicUsize::new(0),
313            validate_calls: AtomicUsize::new(0),
314            register_error: None,
315            validate_error: None,
316        });
317
318        let f = xsd_file();
319        let uri = format!("validator:{}", f.path().display());
320        let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
321            .create_endpoint(&uri, &NoOpComponentContext)
322            .unwrap();
323
324        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
325        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
326            "<order>ok</order>".to_string(),
327        )));
328        assert!(producer.oneshot(exchange).await.is_ok());
329        assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
330        assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
331    }
332
333    #[tokio::test]
334    async fn xsd_bridge_register_error_propagates_on_validate() {
335        let backend = Arc::new(MockXsdBridge {
336            register_calls: AtomicUsize::new(0),
337            validate_calls: AtomicUsize::new(0),
338            register_error: Some(ValidatorError::CompilationFailed {
339                message: "COMPILATION_FAILED".to_string(),
340                source: None,
341            }),
342            validate_error: None,
343        });
344        let f = xsd_file();
345        let uri = format!("validator:{}", f.path().display());
346        // Endpoint creation now always succeeds for XSD (registration is deferred).
347        let ep = ValidatorComponent::with_xsd_bridge(backend)
348            .create_endpoint(&uri, &NoOpComponentContext)
349            .expect("endpoint creation should succeed");
350        // The error surfaces when the first message is processed (register is called).
351        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
352        let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
353            "<order/>".to_string(),
354        )));
355        let err = producer
356            .oneshot(exchange)
357            .await
358            .expect_err("expected validate to fail due to registration error");
359        assert!(err.to_string().contains("COMPILATION_FAILED"));
360    }
361
362    #[tokio::test]
363    async fn test_validator_rejects_oversized_payload() {
364        // Build validator with maxPayloadBytes=100
365        let f = json_schema_file();
366        let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
367        let ep = ValidatorComponent::new()
368            .create_endpoint(&uri, &NoOpComponentContext)
369            .unwrap();
370        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
371        // Send a body that is definitely > 100 bytes
372        let big_body: String = "x".repeat(200);
373        let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
374        let result = producer.oneshot(exchange).await;
375        assert!(result.is_err(), "expected oversized payload to be rejected");
376        let msg = result.unwrap_err().to_string();
377        assert!(
378            msg.contains("payload too large"),
379            "expected 'payload too large' in error, got: {msg}"
380        );
381    }
382
383    #[tokio::test]
384    async fn test_validator_allows_payload_under_limit() {
385        let f = json_schema_file();
386        let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
387        let ep = ValidatorComponent::new()
388            .create_endpoint(&uri, &NoOpComponentContext)
389            .unwrap();
390        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
391        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
392            serde_json::json!({"id": "1"}),
393        )));
394        let result = producer.oneshot(exchange).await;
395        assert!(result.is_ok(), "expected valid payload under limit to pass");
396    }
397
398    #[tokio::test]
399    async fn test_validator_no_limit_allows_any_size() {
400        let f = json_schema_file();
401        let uri = format!("validator:{}", f.path().display());
402        let ep = ValidatorComponent::new()
403            .create_endpoint(&uri, &NoOpComponentContext)
404            .unwrap();
405        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
406        // Valid JSON that would exceed a 10-byte limit
407        let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
408            serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
409        )));
410        let result = producer.oneshot(exchange).await;
411        assert!(
412            result.is_ok(),
413            "expected no-limit validator to pass any valid payload"
414        );
415    }
416
417    #[tokio::test]
418    async fn test_fail_on_null_body_default_rejects_empty() {
419        let f = json_schema_file();
420        let uri = format!("validator:{}", f.path().display());
421        let ep = ValidatorComponent::new()
422            .create_endpoint(&uri, &NoOpComponentContext)
423            .unwrap();
424        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
425        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
426        let result = producer.oneshot(exchange).await;
427        assert!(result.is_err());
428        let msg = result.unwrap_err().to_string();
429        assert!(
430            msg.contains("failOnNullBody"),
431            "expected failOnNullBody in error, got: {msg}"
432        );
433    }
434
435    #[tokio::test]
436    async fn test_fail_on_null_body_false_passes_empty() {
437        let f = json_schema_file();
438        let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
439        let ep = ValidatorComponent::new()
440            .create_endpoint(&uri, &NoOpComponentContext)
441            .unwrap();
442        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
443        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
444        let result = producer.oneshot(exchange).await;
445        assert!(
446            result.is_ok(),
447            "expected empty body to pass with failOnNullBody=false"
448        );
449    }
450
451    #[tokio::test]
452    async fn test_header_name_validation_uses_header_value() {
453        let f = json_schema_file();
454        let uri = format!("validator:{}?headerName=X-Data", f.path().display());
455        let ep = ValidatorComponent::new()
456            .create_endpoint(&uri, &NoOpComponentContext)
457            .unwrap();
458        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
459        let mut msg = Message::new(camel_component_api::Body::Empty);
460        msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
461        let exchange = Exchange::new(msg);
462        let result = producer.oneshot(exchange).await;
463        // Header value {"id":"1"} is valid JSON matching schema
464        assert!(
465            result.is_ok(),
466            "expected valid header to pass: {:?}",
467            result
468        );
469    }
470
471    #[tokio::test]
472    async fn test_header_name_missing_header_fails_by_default() {
473        let f = json_schema_file();
474        let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
475        let ep = ValidatorComponent::new()
476            .create_endpoint(&uri, &NoOpComponentContext)
477            .unwrap();
478        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
479        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
480        let result = producer.oneshot(exchange).await;
481        assert!(result.is_err());
482        let msg = result.unwrap_err().to_string();
483        assert!(
484            msg.contains("X-Missing"),
485            "expected header name in error, got: {msg}"
486        );
487    }
488
489    #[tokio::test]
490    async fn test_fail_on_null_header_false_passes_missing_header() {
491        let f = json_schema_file();
492        let uri = format!(
493            "validator:{}?headerName=X-Missing&failOnNullHeader=false",
494            f.path().display()
495        );
496        let ep = ValidatorComponent::new()
497            .create_endpoint(&uri, &NoOpComponentContext)
498            .unwrap();
499        let producer = ep.create_producer(&ProducerContext::new()).unwrap();
500        let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
501        let result = producer.oneshot(exchange).await;
502        assert!(
503            result.is_ok(),
504            "expected missing header to pass with failOnNullHeader=false"
505        );
506    }
507
508    #[test]
509    fn test_relaxng_schema_type_rejected_at_creation() {
510        let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
511        use std::io::Write;
512        f.write_all(b"<grammar/>").unwrap();
513        let uri = format!("validator:{}", f.path().display());
514        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
515        let err = result.err().expect("expected endpoint creation to fail");
516        let msg = err.to_string();
517        assert!(
518            msg.contains("not yet supported"),
519            "expected 'not yet supported' in error, got: {msg}"
520        );
521    }
522
523    #[test]
524    fn test_schematron_schema_type_rejected_at_creation() {
525        let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
526        use std::io::Write;
527        f.write_all(b"<schema/>").unwrap();
528        let uri = format!("validator:{}", f.path().display());
529        let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
530        let err = result.err().expect("expected endpoint creation to fail");
531        let msg = err.to_string();
532        assert!(
533            msg.contains("not yet supported"),
534            "expected 'not yet supported' in error, got: {msg}"
535        );
536    }
537
538    #[test]
539    fn test_schema_info_returns_description() {
540        let f = json_schema_file();
541        let uri = format!("validator:{}", f.path().display());
542        let ep = ValidatorComponent::new()
543            .create_endpoint(&uri, &NoOpComponentContext)
544            .unwrap();
545        // The endpoint is a Box<dyn Endpoint>, so we can't directly call schema_info().
546        // We verify endpoint creation succeeds (schema compiled).
547        assert_eq!(ep.uri(), uri);
548    }
549}