camel_component_validator/
component.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15 RuntimeObservability,
16};
17
18pub struct ValidatorComponent {
19 xsd_bridge: Arc<dyn XsdBridge>,
20 xsd_backend: Option<Arc<XsdBridgeBackend>>,
21}
22
23impl ValidatorComponent {
24 pub fn new() -> Self {
25 let xsd_backend = Arc::new(XsdBridgeBackend::new());
26 Self {
27 xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
28 xsd_backend: Some(xsd_backend),
29 }
30 }
31
32 pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
33 self.xsd_backend.as_ref().map(Arc::clone)
34 }
35
36 #[cfg(test)]
37 fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
38 Self {
39 xsd_bridge,
40 xsd_backend: None,
41 }
42 }
43}
44
45impl Default for ValidatorComponent {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl Component for ValidatorComponent {
52 fn scheme(&self) -> &str {
53 "validator"
54 }
55
56 fn create_endpoint(
57 &self,
58 uri: &str,
59 _ctx: &dyn ComponentContext,
60 ) -> Result<Box<dyn Endpoint>, CamelError> {
61 let config = ValidatorConfig::from_uri(uri)?;
62 let xsd_bridge = Arc::clone(&self.xsd_bridge);
63 let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
64 Ok(Box::new(ValidatorEndpoint {
65 uri: uri.to_string(),
66 config,
67 compiled: Arc::new(compiled),
68 }))
69 }
70}
71
72struct ValidatorEndpoint {
78 uri: String,
79 config: ValidatorConfig,
80 compiled: Arc<CompiledValidator>,
81}
82
83impl ValidatorEndpoint {
84 #[allow(dead_code)]
86 pub fn schema_info(&self) -> String {
87 format!(
88 "{:?} schema: {}",
89 self.config.schema_type,
90 self.config.schema_path.display()
91 )
92 }
93}
94
95impl Endpoint for ValidatorEndpoint {
96 fn uri(&self) -> &str {
97 &self.uri
98 }
99
100 fn create_consumer(
101 &self,
102 _rt: Arc<dyn RuntimeObservability>,
103 ) -> Result<Box<dyn Consumer>, CamelError> {
104 Err(CamelError::EndpointCreationFailed(
105 "validator endpoint does not support consumers".to_string(),
106 ))
107 }
108
109 fn create_producer(
110 &self,
111 _rt: Arc<dyn RuntimeObservability>,
112 _ctx: &ProducerContext,
113 ) -> Result<BoxProcessor, CamelError> {
114 Ok(BoxProcessor::new(ValidatorProducer {
115 uri: self.uri.clone(),
116 config: self.config.clone(),
117 compiled: Arc::clone(&self.compiled),
118 }))
119 }
120}
121
122#[derive(Clone)]
123struct ValidatorProducer {
124 uri: String,
125 config: ValidatorConfig,
126 compiled: Arc<CompiledValidator>,
127}
128
129impl Service<Exchange> for ValidatorProducer {
130 type Response = Exchange;
131 type Error = CamelError;
132 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
133
134 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
135 Poll::Ready(Ok(()))
136 }
137
138 fn call(&mut self, exchange: Exchange) -> Self::Future {
139 let compiled = Arc::clone(&self.compiled);
140 let uri = self.uri.clone();
141 let config = self.config.clone();
142 Box::pin(async move {
143 debug!(uri = uri, "validating exchange body");
144
145 if let Some(ref header_name) = config.header_name {
147 match exchange.input.header(header_name) {
148 Some(value) => {
149 let header_str = match value.as_str() {
151 Some(s) => s.to_string(),
152 None => value.to_string(),
153 };
154 let header_body = camel_component_api::Body::Text(header_str);
155 compiled.validate(&header_body).await?;
156 }
157 None => {
158 if config.fail_on_null_header {
160 return Err(CamelError::ProcessorError(format!(
161 "header '{header_name}' is missing and failOnNullHeader is true"
162 )));
163 }
164 }
166 }
167 return Ok(exchange);
168 }
169
170 if exchange.input.body.is_empty() {
172 if config.fail_on_null_body {
173 return Err(CamelError::ProcessorError(
174 "body is empty and failOnNullBody is true".to_string(),
175 ));
176 }
177 return Ok(exchange);
179 }
180
181 compiled.validate(&exchange.input.body).await?;
182 Ok(exchange)
183 })
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use camel_component_api::test_support::PanicRuntimeObservability;
190 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
191 std::sync::Arc::new(PanicRuntimeObservability)
192 }
193
194 use super::*;
195 use crate::error::ValidatorError;
196 use async_trait::async_trait;
197 use camel_component_api::{Message, NoOpComponentContext};
198 use std::io::Write;
199 use std::sync::atomic::{AtomicUsize, Ordering};
200 use tower::ServiceExt;
201
202 fn json_schema_file() -> tempfile::NamedTempFile {
203 let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
204 f.write_all(br#"{"type":"object","required":["id"]}"#)
205 .unwrap();
206 f
207 }
208
209 fn xsd_file() -> tempfile::NamedTempFile {
210 let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
211 f.write_all(
212 br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
213 ).unwrap();
214 f
215 }
216
217 #[derive(Debug)]
218 struct MockXsdBridge {
219 register_calls: AtomicUsize,
220 validate_calls: AtomicUsize,
221 register_error: Option<ValidatorError>,
222 validate_error: Option<ValidatorError>,
223 }
224
225 #[async_trait]
226 impl XsdBridge for MockXsdBridge {
227 async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
228 self.register_calls.fetch_add(1, Ordering::SeqCst);
229 if let Some(err) = &self.register_error {
230 return Err(err.clone());
231 }
232 Ok("xsd-mock-id".to_string())
233 }
234
235 async fn validate(
236 &self,
237 _schema_id: &str,
238 _doc_bytes: Vec<u8>,
239 ) -> Result<(), ValidatorError> {
240 self.validate_calls.fetch_add(1, Ordering::SeqCst);
241 if let Some(err) = &self.validate_error {
242 return Err(err.clone());
243 }
244 Ok(())
245 }
246 }
247
248 #[test]
249 fn scheme_is_validator() {
250 assert_eq!(ValidatorComponent::new().scheme(), "validator");
251 }
252
253 #[test]
254 fn consumer_not_supported() {
255 let f = json_schema_file();
256 let uri = format!("validator:{}", f.path().display());
257 let ep = ValidatorComponent::new()
258 .create_endpoint(&uri, &NoOpComponentContext)
259 .unwrap();
260 assert!(ep.create_consumer(rt()).is_err());
261 }
262
263 #[test]
264 fn endpoint_creation_fails_for_missing_schema() {
265 let result = ValidatorComponent::new()
266 .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
267 assert!(result.is_err());
268 }
269
270 #[tokio::test]
271 async fn valid_json_body_passes_through() {
272 let f = json_schema_file();
273 let uri = format!("validator:{}", f.path().display());
274 let ep = ValidatorComponent::new()
275 .create_endpoint(&uri, &NoOpComponentContext)
276 .unwrap();
277 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
278 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
279 serde_json::json!({"id": "1"}),
280 )));
281 let result = producer.oneshot(exchange).await;
282 assert!(result.is_ok());
283 }
284
285 #[tokio::test]
286 async fn invalid_json_body_returns_err() {
287 let f = json_schema_file();
288 let uri = format!("validator:{}", f.path().display());
289 let ep = ValidatorComponent::new()
290 .create_endpoint(&uri, &NoOpComponentContext)
291 .unwrap();
292 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
293 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
294 serde_json::json!({"name": "x"}),
295 )));
296 let result = producer.oneshot(exchange).await;
297 assert!(result.is_err());
298 let msg = result.unwrap_err().to_string();
299 assert!(msg.contains("validation failed"), "got: {msg}");
300 }
301
302 #[tokio::test]
303 async fn valid_xml_body_passes() {
304 let backend = Arc::new(MockXsdBridge {
305 register_calls: AtomicUsize::new(0),
306 validate_calls: AtomicUsize::new(0),
307 register_error: None,
308 validate_error: None,
309 });
310 let f = xsd_file();
311 let uri = format!("validator:{}", f.path().display());
312 let ep = ValidatorComponent::with_xsd_bridge(backend)
313 .create_endpoint(&uri, &NoOpComponentContext)
314 .unwrap();
315 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
316 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
317 "<order>hello</order>".to_string(),
318 )));
319 assert!(producer.oneshot(exchange).await.is_ok());
320 }
321
322 #[tokio::test]
323 async fn xsd_bridge_register_and_validate_mock() {
324 let backend = Arc::new(MockXsdBridge {
325 register_calls: AtomicUsize::new(0),
326 validate_calls: AtomicUsize::new(0),
327 register_error: None,
328 validate_error: None,
329 });
330
331 let f = xsd_file();
332 let uri = format!("validator:{}", f.path().display());
333 let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
334 .create_endpoint(&uri, &NoOpComponentContext)
335 .unwrap();
336
337 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
338 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
339 "<order>ok</order>".to_string(),
340 )));
341 assert!(producer.oneshot(exchange).await.is_ok());
342 assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
343 assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
344 }
345
346 #[tokio::test]
347 async fn xsd_bridge_register_error_propagates_on_validate() {
348 let backend = Arc::new(MockXsdBridge {
349 register_calls: AtomicUsize::new(0),
350 validate_calls: AtomicUsize::new(0),
351 register_error: Some(ValidatorError::CompilationFailed {
352 message: "COMPILATION_FAILED".to_string(),
353 source: None,
354 }),
355 validate_error: None,
356 });
357 let f = xsd_file();
358 let uri = format!("validator:{}", f.path().display());
359 let ep = ValidatorComponent::with_xsd_bridge(backend)
361 .create_endpoint(&uri, &NoOpComponentContext)
362 .expect("endpoint creation should succeed");
363 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
365 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
366 "<order/>".to_string(),
367 )));
368 let err = producer
369 .oneshot(exchange)
370 .await
371 .expect_err("expected validate to fail due to registration error");
372 assert!(err.to_string().contains("COMPILATION_FAILED"));
373 }
374
375 #[tokio::test]
376 async fn test_validator_rejects_oversized_payload() {
377 let f = json_schema_file();
379 let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
380 let ep = ValidatorComponent::new()
381 .create_endpoint(&uri, &NoOpComponentContext)
382 .unwrap();
383 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
384 let big_body: String = "x".repeat(200);
386 let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
387 let result = producer.oneshot(exchange).await;
388 assert!(result.is_err(), "expected oversized payload to be rejected");
389 let msg = result.unwrap_err().to_string();
390 assert!(
391 msg.contains("payload too large"),
392 "expected 'payload too large' in error, got: {msg}"
393 );
394 }
395
396 #[tokio::test]
397 async fn test_validator_allows_payload_under_limit() {
398 let f = json_schema_file();
399 let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
400 let ep = ValidatorComponent::new()
401 .create_endpoint(&uri, &NoOpComponentContext)
402 .unwrap();
403 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
404 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
405 serde_json::json!({"id": "1"}),
406 )));
407 let result = producer.oneshot(exchange).await;
408 assert!(result.is_ok(), "expected valid payload under limit to pass");
409 }
410
411 #[tokio::test]
412 async fn test_validator_no_limit_allows_any_size() {
413 let f = json_schema_file();
414 let uri = format!("validator:{}", f.path().display());
415 let ep = ValidatorComponent::new()
416 .create_endpoint(&uri, &NoOpComponentContext)
417 .unwrap();
418 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
419 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
421 serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
422 )));
423 let result = producer.oneshot(exchange).await;
424 assert!(
425 result.is_ok(),
426 "expected no-limit validator to pass any valid payload"
427 );
428 }
429
430 #[tokio::test]
431 async fn test_fail_on_null_body_default_rejects_empty() {
432 let f = json_schema_file();
433 let uri = format!("validator:{}", f.path().display());
434 let ep = ValidatorComponent::new()
435 .create_endpoint(&uri, &NoOpComponentContext)
436 .unwrap();
437 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
438 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
439 let result = producer.oneshot(exchange).await;
440 assert!(result.is_err());
441 let msg = result.unwrap_err().to_string();
442 assert!(
443 msg.contains("failOnNullBody"),
444 "expected failOnNullBody in error, got: {msg}"
445 );
446 }
447
448 #[tokio::test]
449 async fn test_fail_on_null_body_false_passes_empty() {
450 let f = json_schema_file();
451 let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
452 let ep = ValidatorComponent::new()
453 .create_endpoint(&uri, &NoOpComponentContext)
454 .unwrap();
455 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
456 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
457 let result = producer.oneshot(exchange).await;
458 assert!(
459 result.is_ok(),
460 "expected empty body to pass with failOnNullBody=false"
461 );
462 }
463
464 #[tokio::test]
465 async fn test_header_name_validation_uses_header_value() {
466 let f = json_schema_file();
467 let uri = format!("validator:{}?headerName=X-Data", f.path().display());
468 let ep = ValidatorComponent::new()
469 .create_endpoint(&uri, &NoOpComponentContext)
470 .unwrap();
471 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
472 let mut msg = Message::new(camel_component_api::Body::Empty);
473 msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
474 let exchange = Exchange::new(msg);
475 let result = producer.oneshot(exchange).await;
476 assert!(
478 result.is_ok(),
479 "expected valid header to pass: {:?}",
480 result
481 );
482 }
483
484 #[tokio::test]
485 async fn test_header_name_missing_header_fails_by_default() {
486 let f = json_schema_file();
487 let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
488 let ep = ValidatorComponent::new()
489 .create_endpoint(&uri, &NoOpComponentContext)
490 .unwrap();
491 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
492 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
493 let result = producer.oneshot(exchange).await;
494 assert!(result.is_err());
495 let msg = result.unwrap_err().to_string();
496 assert!(
497 msg.contains("X-Missing"),
498 "expected header name in error, got: {msg}"
499 );
500 }
501
502 #[tokio::test]
503 async fn test_fail_on_null_header_false_passes_missing_header() {
504 let f = json_schema_file();
505 let uri = format!(
506 "validator:{}?headerName=X-Missing&failOnNullHeader=false",
507 f.path().display()
508 );
509 let ep = ValidatorComponent::new()
510 .create_endpoint(&uri, &NoOpComponentContext)
511 .unwrap();
512 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
513 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
514 let result = producer.oneshot(exchange).await;
515 assert!(
516 result.is_ok(),
517 "expected missing header to pass with failOnNullHeader=false"
518 );
519 }
520
521 #[test]
522 fn test_relaxng_schema_type_rejected_at_creation() {
523 let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
524 use std::io::Write;
525 f.write_all(b"<grammar/>").unwrap();
526 let uri = format!("validator:{}", f.path().display());
527 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
528 let err = result.err().expect("expected endpoint creation to fail");
529 let msg = err.to_string();
530 assert!(
531 msg.contains("not yet supported"),
532 "expected 'not yet supported' in error, got: {msg}"
533 );
534 }
535
536 #[test]
537 fn test_schematron_schema_type_rejected_at_creation() {
538 let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
539 use std::io::Write;
540 f.write_all(b"<schema/>").unwrap();
541 let uri = format!("validator:{}", f.path().display());
542 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
543 let err = result.err().expect("expected endpoint creation to fail");
544 let msg = err.to_string();
545 assert!(
546 msg.contains("not yet supported"),
547 "expected 'not yet supported' in error, got: {msg}"
548 );
549 }
550
551 #[test]
552 fn test_schema_info_returns_description() {
553 let f = json_schema_file();
554 let uri = format!("validator:{}", f.path().display());
555 let ep = ValidatorComponent::new()
556 .create_endpoint(&uri, &NoOpComponentContext)
557 .unwrap();
558 assert_eq!(ep.uri(), uri);
561 }
562}