camel_component_validator/
component.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15};
16
17pub struct ValidatorComponent {
18 xsd_bridge: Arc<dyn XsdBridge>,
19 xsd_backend: Option<Arc<XsdBridgeBackend>>,
20}
21
22impl ValidatorComponent {
23 pub fn new() -> Self {
24 let xsd_backend = Arc::new(XsdBridgeBackend::new());
25 Self {
26 xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
27 xsd_backend: Some(xsd_backend),
28 }
29 }
30
31 pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
32 self.xsd_backend.as_ref().map(Arc::clone)
33 }
34
35 #[cfg(test)]
36 fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
37 Self {
38 xsd_bridge,
39 xsd_backend: None,
40 }
41 }
42}
43
44impl Default for ValidatorComponent {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl Component for ValidatorComponent {
51 fn scheme(&self) -> &str {
52 "validator"
53 }
54
55 fn create_endpoint(
56 &self,
57 uri: &str,
58 _ctx: &dyn ComponentContext,
59 ) -> Result<Box<dyn Endpoint>, CamelError> {
60 let config = ValidatorConfig::from_uri(uri)?;
61 let xsd_bridge = Arc::clone(&self.xsd_bridge);
62 let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
63 Ok(Box::new(ValidatorEndpoint {
64 uri: uri.to_string(),
65 config,
66 compiled: Arc::new(compiled),
67 }))
68 }
69}
70
71struct ValidatorEndpoint {
77 uri: String,
78 config: ValidatorConfig,
79 compiled: Arc<CompiledValidator>,
80}
81
82impl ValidatorEndpoint {
83 #[allow(dead_code)]
85 pub fn schema_info(&self) -> String {
86 format!(
87 "{:?} schema: {}",
88 self.config.schema_type,
89 self.config.schema_path.display()
90 )
91 }
92}
93
94impl Endpoint for ValidatorEndpoint {
95 fn uri(&self) -> &str {
96 &self.uri
97 }
98
99 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100 Err(CamelError::EndpointCreationFailed(
101 "validator endpoint does not support consumers".to_string(),
102 ))
103 }
104
105 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
106 Ok(BoxProcessor::new(ValidatorProducer {
107 uri: self.uri.clone(),
108 config: self.config.clone(),
109 compiled: Arc::clone(&self.compiled),
110 }))
111 }
112}
113
114#[derive(Clone)]
115struct ValidatorProducer {
116 uri: String,
117 config: ValidatorConfig,
118 compiled: Arc<CompiledValidator>,
119}
120
121impl Service<Exchange> for ValidatorProducer {
122 type Response = Exchange;
123 type Error = CamelError;
124 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
125
126 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127 Poll::Ready(Ok(()))
128 }
129
130 fn call(&mut self, exchange: Exchange) -> Self::Future {
131 let compiled = Arc::clone(&self.compiled);
132 let uri = self.uri.clone();
133 let config = self.config.clone();
134 Box::pin(async move {
135 debug!(uri = uri, "validating exchange body");
136
137 if let Some(ref header_name) = config.header_name {
139 match exchange.input.header(header_name) {
140 Some(value) => {
141 let header_str = match value.as_str() {
143 Some(s) => s.to_string(),
144 None => value.to_string(),
145 };
146 let header_body = camel_component_api::Body::Text(header_str);
147 compiled.validate(&header_body).await?;
148 }
149 None => {
150 if config.fail_on_null_header {
152 return Err(CamelError::ProcessorError(format!(
153 "header '{header_name}' is missing and failOnNullHeader is true"
154 )));
155 }
156 }
158 }
159 return Ok(exchange);
160 }
161
162 if exchange.input.body.is_empty() {
164 if config.fail_on_null_body {
165 return Err(CamelError::ProcessorError(
166 "body is empty and failOnNullBody is true".to_string(),
167 ));
168 }
169 return Ok(exchange);
171 }
172
173 compiled.validate(&exchange.input.body).await?;
174 Ok(exchange)
175 })
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::error::ValidatorError;
183 use async_trait::async_trait;
184 use camel_component_api::{Message, NoOpComponentContext};
185 use std::io::Write;
186 use std::sync::atomic::{AtomicUsize, Ordering};
187 use tower::ServiceExt;
188
189 fn json_schema_file() -> tempfile::NamedTempFile {
190 let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
191 f.write_all(br#"{"type":"object","required":["id"]}"#)
192 .unwrap();
193 f
194 }
195
196 fn xsd_file() -> tempfile::NamedTempFile {
197 let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
198 f.write_all(
199 br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
200 ).unwrap();
201 f
202 }
203
204 #[derive(Debug)]
205 struct MockXsdBridge {
206 register_calls: AtomicUsize,
207 validate_calls: AtomicUsize,
208 register_error: Option<ValidatorError>,
209 validate_error: Option<ValidatorError>,
210 }
211
212 #[async_trait]
213 impl XsdBridge for MockXsdBridge {
214 async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
215 self.register_calls.fetch_add(1, Ordering::SeqCst);
216 if let Some(err) = &self.register_error {
217 return Err(err.clone());
218 }
219 Ok("xsd-mock-id".to_string())
220 }
221
222 async fn validate(
223 &self,
224 _schema_id: &str,
225 _doc_bytes: Vec<u8>,
226 ) -> Result<(), ValidatorError> {
227 self.validate_calls.fetch_add(1, Ordering::SeqCst);
228 if let Some(err) = &self.validate_error {
229 return Err(err.clone());
230 }
231 Ok(())
232 }
233 }
234
235 #[test]
236 fn scheme_is_validator() {
237 assert_eq!(ValidatorComponent::new().scheme(), "validator");
238 }
239
240 #[test]
241 fn consumer_not_supported() {
242 let f = json_schema_file();
243 let uri = format!("validator:{}", f.path().display());
244 let ep = ValidatorComponent::new()
245 .create_endpoint(&uri, &NoOpComponentContext)
246 .unwrap();
247 assert!(ep.create_consumer().is_err());
248 }
249
250 #[test]
251 fn endpoint_creation_fails_for_missing_schema() {
252 let result = ValidatorComponent::new()
253 .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
254 assert!(result.is_err());
255 }
256
257 #[tokio::test]
258 async fn valid_json_body_passes_through() {
259 let f = json_schema_file();
260 let uri = format!("validator:{}", f.path().display());
261 let ep = ValidatorComponent::new()
262 .create_endpoint(&uri, &NoOpComponentContext)
263 .unwrap();
264 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
265 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
266 serde_json::json!({"id": "1"}),
267 )));
268 let result = producer.oneshot(exchange).await;
269 assert!(result.is_ok());
270 }
271
272 #[tokio::test]
273 async fn invalid_json_body_returns_err() {
274 let f = json_schema_file();
275 let uri = format!("validator:{}", f.path().display());
276 let ep = ValidatorComponent::new()
277 .create_endpoint(&uri, &NoOpComponentContext)
278 .unwrap();
279 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
280 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
281 serde_json::json!({"name": "x"}),
282 )));
283 let result = producer.oneshot(exchange).await;
284 assert!(result.is_err());
285 let msg = result.unwrap_err().to_string();
286 assert!(msg.contains("validation failed"), "got: {msg}");
287 }
288
289 #[tokio::test]
290 async fn valid_xml_body_passes() {
291 let backend = Arc::new(MockXsdBridge {
292 register_calls: AtomicUsize::new(0),
293 validate_calls: AtomicUsize::new(0),
294 register_error: None,
295 validate_error: None,
296 });
297 let f = xsd_file();
298 let uri = format!("validator:{}", f.path().display());
299 let ep = ValidatorComponent::with_xsd_bridge(backend)
300 .create_endpoint(&uri, &NoOpComponentContext)
301 .unwrap();
302 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
303 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
304 "<order>hello</order>".to_string(),
305 )));
306 assert!(producer.oneshot(exchange).await.is_ok());
307 }
308
309 #[tokio::test]
310 async fn xsd_bridge_register_and_validate_mock() {
311 let backend = Arc::new(MockXsdBridge {
312 register_calls: AtomicUsize::new(0),
313 validate_calls: AtomicUsize::new(0),
314 register_error: None,
315 validate_error: None,
316 });
317
318 let f = xsd_file();
319 let uri = format!("validator:{}", f.path().display());
320 let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
321 .create_endpoint(&uri, &NoOpComponentContext)
322 .unwrap();
323
324 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
325 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
326 "<order>ok</order>".to_string(),
327 )));
328 assert!(producer.oneshot(exchange).await.is_ok());
329 assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
330 assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
331 }
332
333 #[tokio::test]
334 async fn xsd_bridge_register_error_propagates_on_validate() {
335 let backend = Arc::new(MockXsdBridge {
336 register_calls: AtomicUsize::new(0),
337 validate_calls: AtomicUsize::new(0),
338 register_error: Some(ValidatorError::CompilationFailed {
339 message: "COMPILATION_FAILED".to_string(),
340 source: None,
341 }),
342 validate_error: None,
343 });
344 let f = xsd_file();
345 let uri = format!("validator:{}", f.path().display());
346 let ep = ValidatorComponent::with_xsd_bridge(backend)
348 .create_endpoint(&uri, &NoOpComponentContext)
349 .expect("endpoint creation should succeed");
350 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
352 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
353 "<order/>".to_string(),
354 )));
355 let err = producer
356 .oneshot(exchange)
357 .await
358 .expect_err("expected validate to fail due to registration error");
359 assert!(err.to_string().contains("COMPILATION_FAILED"));
360 }
361
362 #[tokio::test]
363 async fn test_validator_rejects_oversized_payload() {
364 let f = json_schema_file();
366 let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
367 let ep = ValidatorComponent::new()
368 .create_endpoint(&uri, &NoOpComponentContext)
369 .unwrap();
370 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
371 let big_body: String = "x".repeat(200);
373 let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
374 let result = producer.oneshot(exchange).await;
375 assert!(result.is_err(), "expected oversized payload to be rejected");
376 let msg = result.unwrap_err().to_string();
377 assert!(
378 msg.contains("payload too large"),
379 "expected 'payload too large' in error, got: {msg}"
380 );
381 }
382
383 #[tokio::test]
384 async fn test_validator_allows_payload_under_limit() {
385 let f = json_schema_file();
386 let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
387 let ep = ValidatorComponent::new()
388 .create_endpoint(&uri, &NoOpComponentContext)
389 .unwrap();
390 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
391 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
392 serde_json::json!({"id": "1"}),
393 )));
394 let result = producer.oneshot(exchange).await;
395 assert!(result.is_ok(), "expected valid payload under limit to pass");
396 }
397
398 #[tokio::test]
399 async fn test_validator_no_limit_allows_any_size() {
400 let f = json_schema_file();
401 let uri = format!("validator:{}", f.path().display());
402 let ep = ValidatorComponent::new()
403 .create_endpoint(&uri, &NoOpComponentContext)
404 .unwrap();
405 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
406 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
408 serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
409 )));
410 let result = producer.oneshot(exchange).await;
411 assert!(
412 result.is_ok(),
413 "expected no-limit validator to pass any valid payload"
414 );
415 }
416
417 #[tokio::test]
418 async fn test_fail_on_null_body_default_rejects_empty() {
419 let f = json_schema_file();
420 let uri = format!("validator:{}", f.path().display());
421 let ep = ValidatorComponent::new()
422 .create_endpoint(&uri, &NoOpComponentContext)
423 .unwrap();
424 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
425 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
426 let result = producer.oneshot(exchange).await;
427 assert!(result.is_err());
428 let msg = result.unwrap_err().to_string();
429 assert!(
430 msg.contains("failOnNullBody"),
431 "expected failOnNullBody in error, got: {msg}"
432 );
433 }
434
435 #[tokio::test]
436 async fn test_fail_on_null_body_false_passes_empty() {
437 let f = json_schema_file();
438 let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
439 let ep = ValidatorComponent::new()
440 .create_endpoint(&uri, &NoOpComponentContext)
441 .unwrap();
442 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
443 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
444 let result = producer.oneshot(exchange).await;
445 assert!(
446 result.is_ok(),
447 "expected empty body to pass with failOnNullBody=false"
448 );
449 }
450
451 #[tokio::test]
452 async fn test_header_name_validation_uses_header_value() {
453 let f = json_schema_file();
454 let uri = format!("validator:{}?headerName=X-Data", f.path().display());
455 let ep = ValidatorComponent::new()
456 .create_endpoint(&uri, &NoOpComponentContext)
457 .unwrap();
458 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
459 let mut msg = Message::new(camel_component_api::Body::Empty);
460 msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
461 let exchange = Exchange::new(msg);
462 let result = producer.oneshot(exchange).await;
463 assert!(
465 result.is_ok(),
466 "expected valid header to pass: {:?}",
467 result
468 );
469 }
470
471 #[tokio::test]
472 async fn test_header_name_missing_header_fails_by_default() {
473 let f = json_schema_file();
474 let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
475 let ep = ValidatorComponent::new()
476 .create_endpoint(&uri, &NoOpComponentContext)
477 .unwrap();
478 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
479 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
480 let result = producer.oneshot(exchange).await;
481 assert!(result.is_err());
482 let msg = result.unwrap_err().to_string();
483 assert!(
484 msg.contains("X-Missing"),
485 "expected header name in error, got: {msg}"
486 );
487 }
488
489 #[tokio::test]
490 async fn test_fail_on_null_header_false_passes_missing_header() {
491 let f = json_schema_file();
492 let uri = format!(
493 "validator:{}?headerName=X-Missing&failOnNullHeader=false",
494 f.path().display()
495 );
496 let ep = ValidatorComponent::new()
497 .create_endpoint(&uri, &NoOpComponentContext)
498 .unwrap();
499 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
500 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
501 let result = producer.oneshot(exchange).await;
502 assert!(
503 result.is_ok(),
504 "expected missing header to pass with failOnNullHeader=false"
505 );
506 }
507
508 #[test]
509 fn test_relaxng_schema_type_rejected_at_creation() {
510 let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
511 use std::io::Write;
512 f.write_all(b"<grammar/>").unwrap();
513 let uri = format!("validator:{}", f.path().display());
514 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
515 let err = result.err().expect("expected endpoint creation to fail");
516 let msg = err.to_string();
517 assert!(
518 msg.contains("not yet supported"),
519 "expected 'not yet supported' in error, got: {msg}"
520 );
521 }
522
523 #[test]
524 fn test_schematron_schema_type_rejected_at_creation() {
525 let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
526 use std::io::Write;
527 f.write_all(b"<schema/>").unwrap();
528 let uri = format!("validator:{}", f.path().display());
529 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
530 let err = result.err().expect("expected endpoint creation to fail");
531 let msg = err.to_string();
532 assert!(
533 msg.contains("not yet supported"),
534 "expected 'not yet supported' in error, got: {msg}"
535 );
536 }
537
538 #[test]
539 fn test_schema_info_returns_description() {
540 let f = json_schema_file();
541 let uri = format!("validator:{}", f.path().display());
542 let ep = ValidatorComponent::new()
543 .create_endpoint(&uri, &NoOpComponentContext)
544 .unwrap();
545 assert_eq!(ep.uri(), uri);
548 }
549}