1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15 RuntimeObservability,
16};
17
18pub struct ValidatorComponent {
19 xsd_bridge: Arc<dyn XsdBridge>,
20 xsd_backend: Option<Arc<XsdBridgeBackend>>,
21}
22
23impl ValidatorComponent {
24 pub fn new() -> Self {
25 let xsd_backend = Arc::new(XsdBridgeBackend::new());
26 Self {
27 xsd_bridge: Arc::clone(&xsd_backend) as Arc<dyn XsdBridge>,
28 xsd_backend: Some(xsd_backend),
29 }
30 }
31
32 pub fn xsd_bridge_backend(&self) -> Option<Arc<XsdBridgeBackend>> {
33 self.xsd_backend.as_ref().map(Arc::clone)
34 }
35
36 #[cfg(test)]
37 fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
38 Self {
39 xsd_bridge,
40 xsd_backend: None,
41 }
42 }
43}
44
45impl Default for ValidatorComponent {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl Component for ValidatorComponent {
52 fn scheme(&self) -> &str {
53 "validator"
54 }
55
56 fn create_endpoint(
57 &self,
58 uri: &str,
59 _ctx: &dyn ComponentContext,
60 ) -> Result<Box<dyn Endpoint>, CamelError> {
61 let config = ValidatorConfig::from_uri(uri)?;
62 let xsd_bridge = Arc::clone(&self.xsd_bridge);
63 let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
64 Ok(Box::new(ValidatorEndpoint {
65 uri: uri.to_string(),
66 config,
67 compiled: Arc::new(compiled),
68 xsd_backend: self.xsd_backend.as_ref().map(Arc::clone),
69 }))
70 }
71}
72
73struct ValidatorEndpoint {
79 uri: String,
80 config: ValidatorConfig,
81 compiled: Arc<CompiledValidator>,
82 xsd_backend: Option<Arc<XsdBridgeBackend>>,
83}
84
85impl ValidatorEndpoint {
86 #[allow(dead_code)]
88 pub fn schema_info(&self) -> String {
89 format!(
90 "{:?} schema: {}",
91 self.config.schema_type,
92 self.config.schema_path.display()
93 )
94 }
95}
96
97impl Endpoint for ValidatorEndpoint {
98 fn uri(&self) -> &str {
99 &self.uri
100 }
101
102 fn create_consumer(
103 &self,
104 _rt: Arc<dyn RuntimeObservability>,
105 ) -> Result<Box<dyn Consumer>, CamelError> {
106 Err(CamelError::EndpointCreationFailed(
107 "validator endpoint does not support consumers".to_string(),
108 ))
109 }
110
111 fn create_producer(
112 &self,
113 rt: Arc<dyn RuntimeObservability>,
114 ctx: &ProducerContext,
115 ) -> Result<BoxProcessor, CamelError> {
116 if let Some(ref backend) = self.xsd_backend {
117 backend.set_observability(
118 rt.clone(),
119 ctx.route_id().unwrap_or("validator-bridge").to_string(),
120 );
121 }
122 Ok(BoxProcessor::new(ValidatorProducer {
123 uri: self.uri.clone(),
124 config: self.config.clone(),
125 compiled: Arc::clone(&self.compiled),
126 }))
127 }
128}
129
130#[derive(Clone)]
131struct ValidatorProducer {
132 uri: String,
133 config: ValidatorConfig,
134 compiled: Arc<CompiledValidator>,
135}
136
137impl Service<Exchange> for ValidatorProducer {
138 type Response = Exchange;
139 type Error = CamelError;
140 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
141
142 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143 Poll::Ready(Ok(()))
144 }
145
146 fn call(&mut self, exchange: Exchange) -> Self::Future {
147 let compiled = Arc::clone(&self.compiled);
148 let uri = self.uri.clone();
149 let config = self.config.clone();
150 Box::pin(async move {
151 debug!(uri = uri, "validating exchange body");
152
153 if let Some(ref header_name) = config.header_name {
155 match exchange.input.header(header_name) {
156 Some(value) => {
157 let header_str = match value.as_str() {
159 Some(s) => s.to_string(),
160 None => value.to_string(),
161 };
162 let header_body = camel_component_api::Body::Text(header_str);
163 compiled.validate(&header_body).await?;
164 }
165 None => {
166 if config.fail_on_null_header {
168 return Err(CamelError::ProcessorError(format!(
169 "header '{header_name}' is missing and failOnNullHeader is true"
170 )));
171 }
172 }
174 }
175 return Ok(exchange);
176 }
177
178 if exchange.input.body.is_empty() {
180 if config.fail_on_null_body {
181 return Err(CamelError::ProcessorError(
182 "body is empty and failOnNullBody is true".to_string(),
183 ));
184 }
185 return Ok(exchange);
187 }
188
189 compiled.validate(&exchange.input.body).await?;
190 Ok(exchange)
191 })
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use camel_component_api::test_support::PanicRuntimeObservability;
198 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
199 std::sync::Arc::new(PanicRuntimeObservability)
200 }
201
202 use super::*;
203 use crate::error::ValidatorError;
204 use async_trait::async_trait;
205 use camel_component_api::{Message, NoOpComponentContext};
206 use std::io::Write;
207 use std::sync::atomic::{AtomicUsize, Ordering};
208 use tower::ServiceExt;
209
210 fn json_schema_file() -> tempfile::NamedTempFile {
211 let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
212 f.write_all(br#"{"type":"object","required":["id"]}"#)
213 .unwrap();
214 f
215 }
216
217 fn xsd_file() -> tempfile::NamedTempFile {
218 let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
219 f.write_all(
220 br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
221 ).unwrap();
222 f
223 }
224
225 #[derive(Debug)]
226 struct MockXsdBridge {
227 register_calls: AtomicUsize,
228 validate_calls: AtomicUsize,
229 register_error: Option<ValidatorError>,
230 validate_error: Option<ValidatorError>,
231 }
232
233 #[async_trait]
234 impl XsdBridge for MockXsdBridge {
235 async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
236 self.register_calls.fetch_add(1, Ordering::SeqCst);
237 if let Some(err) = &self.register_error {
238 return Err(err.clone());
239 }
240 Ok("xsd-mock-id".to_string())
241 }
242
243 async fn validate(
244 &self,
245 _schema_id: &str,
246 _doc_bytes: Vec<u8>,
247 ) -> Result<(), ValidatorError> {
248 self.validate_calls.fetch_add(1, Ordering::SeqCst);
249 if let Some(err) = &self.validate_error {
250 return Err(err.clone());
251 }
252 Ok(())
253 }
254 }
255
256 #[test]
257 fn scheme_is_validator() {
258 assert_eq!(ValidatorComponent::new().scheme(), "validator");
259 }
260
261 #[test]
262 fn consumer_not_supported() {
263 let f = json_schema_file();
264 let uri = format!("validator:{}", f.path().display());
265 let ep = ValidatorComponent::new()
266 .create_endpoint(&uri, &NoOpComponentContext)
267 .unwrap();
268 assert!(ep.create_consumer(rt()).is_err());
269 }
270
271 #[test]
272 fn endpoint_creation_fails_for_missing_schema() {
273 let result = ValidatorComponent::new()
274 .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
275 assert!(result.is_err());
276 }
277
278 #[tokio::test]
279 async fn valid_json_body_passes_through() {
280 let f = json_schema_file();
281 let uri = format!("validator:{}", f.path().display());
282 let ep = ValidatorComponent::new()
283 .create_endpoint(&uri, &NoOpComponentContext)
284 .unwrap();
285 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
286 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
287 serde_json::json!({"id": "1"}),
288 )));
289 let result = producer.oneshot(exchange).await;
290 assert!(result.is_ok());
291 }
292
293 #[tokio::test]
294 async fn invalid_json_body_returns_err() {
295 let f = json_schema_file();
296 let uri = format!("validator:{}", f.path().display());
297 let ep = ValidatorComponent::new()
298 .create_endpoint(&uri, &NoOpComponentContext)
299 .unwrap();
300 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
301 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
302 serde_json::json!({"name": "x"}),
303 )));
304 let result = producer.oneshot(exchange).await;
305 assert!(result.is_err());
306 let msg = result.unwrap_err().to_string();
307 assert!(msg.contains("validation failed"), "got: {msg}");
308 }
309
310 #[tokio::test]
311 async fn valid_xml_body_passes() {
312 let backend = Arc::new(MockXsdBridge {
313 register_calls: AtomicUsize::new(0),
314 validate_calls: AtomicUsize::new(0),
315 register_error: None,
316 validate_error: None,
317 });
318 let f = xsd_file();
319 let uri = format!("validator:{}", f.path().display());
320 let ep = ValidatorComponent::with_xsd_bridge(backend)
321 .create_endpoint(&uri, &NoOpComponentContext)
322 .unwrap();
323 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
324 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
325 "<order>hello</order>".to_string(),
326 )));
327 assert!(producer.oneshot(exchange).await.is_ok());
328 }
329
330 #[tokio::test]
331 async fn xsd_bridge_register_and_validate_mock() {
332 let backend = Arc::new(MockXsdBridge {
333 register_calls: AtomicUsize::new(0),
334 validate_calls: AtomicUsize::new(0),
335 register_error: None,
336 validate_error: None,
337 });
338
339 let f = xsd_file();
340 let uri = format!("validator:{}", f.path().display());
341 let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
342 .create_endpoint(&uri, &NoOpComponentContext)
343 .unwrap();
344
345 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
346 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
347 "<order>ok</order>".to_string(),
348 )));
349 assert!(producer.oneshot(exchange).await.is_ok());
350 assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
351 assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
352 }
353
354 #[tokio::test]
355 async fn xsd_bridge_register_error_propagates_on_validate() {
356 let backend = Arc::new(MockXsdBridge {
357 register_calls: AtomicUsize::new(0),
358 validate_calls: AtomicUsize::new(0),
359 register_error: Some(ValidatorError::CompilationFailed {
360 message: "COMPILATION_FAILED".to_string(),
361 source: None,
362 }),
363 validate_error: None,
364 });
365 let f = xsd_file();
366 let uri = format!("validator:{}", f.path().display());
367 let ep = ValidatorComponent::with_xsd_bridge(backend)
369 .create_endpoint(&uri, &NoOpComponentContext)
370 .expect("endpoint creation should succeed");
371 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
373 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
374 "<order/>".to_string(),
375 )));
376 let err = producer
377 .oneshot(exchange)
378 .await
379 .expect_err("expected validate to fail due to registration error");
380 assert!(err.to_string().contains("COMPILATION_FAILED"));
381 }
382
383 #[tokio::test]
384 async fn test_validator_rejects_oversized_payload() {
385 let f = json_schema_file();
387 let uri = format!("validator:{}?maxPayloadBytes=100", f.path().display());
388 let ep = ValidatorComponent::new()
389 .create_endpoint(&uri, &NoOpComponentContext)
390 .unwrap();
391 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
392 let big_body: String = "x".repeat(200);
394 let exchange = Exchange::new(Message::new(camel_component_api::Body::Text(big_body)));
395 let result = producer.oneshot(exchange).await;
396 assert!(result.is_err(), "expected oversized payload to be rejected");
397 let msg = result.unwrap_err().to_string();
398 assert!(
399 msg.contains("payload too large"),
400 "expected 'payload too large' in error, got: {msg}"
401 );
402 }
403
404 #[tokio::test]
405 async fn test_validator_allows_payload_under_limit() {
406 let f = json_schema_file();
407 let uri = format!("validator:{}?maxPayloadBytes=1024", f.path().display());
408 let ep = ValidatorComponent::new()
409 .create_endpoint(&uri, &NoOpComponentContext)
410 .unwrap();
411 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
412 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
413 serde_json::json!({"id": "1"}),
414 )));
415 let result = producer.oneshot(exchange).await;
416 assert!(result.is_ok(), "expected valid payload under limit to pass");
417 }
418
419 #[tokio::test]
420 async fn test_validator_no_limit_allows_any_size() {
421 let f = json_schema_file();
422 let uri = format!("validator:{}", f.path().display());
423 let ep = ValidatorComponent::new()
424 .create_endpoint(&uri, &NoOpComponentContext)
425 .unwrap();
426 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
427 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
429 serde_json::json!({"id": "this is a longer value that would exceed small limits"}),
430 )));
431 let result = producer.oneshot(exchange).await;
432 assert!(
433 result.is_ok(),
434 "expected no-limit validator to pass any valid payload"
435 );
436 }
437
438 #[tokio::test]
439 async fn test_fail_on_null_body_default_rejects_empty() {
440 let f = json_schema_file();
441 let uri = format!("validator:{}", f.path().display());
442 let ep = ValidatorComponent::new()
443 .create_endpoint(&uri, &NoOpComponentContext)
444 .unwrap();
445 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
446 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
447 let result = producer.oneshot(exchange).await;
448 assert!(result.is_err());
449 let msg = result.unwrap_err().to_string();
450 assert!(
451 msg.contains("failOnNullBody"),
452 "expected failOnNullBody in error, got: {msg}"
453 );
454 }
455
456 #[tokio::test]
457 async fn test_fail_on_null_body_false_passes_empty() {
458 let f = json_schema_file();
459 let uri = format!("validator:{}?failOnNullBody=false", f.path().display());
460 let ep = ValidatorComponent::new()
461 .create_endpoint(&uri, &NoOpComponentContext)
462 .unwrap();
463 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
464 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
465 let result = producer.oneshot(exchange).await;
466 assert!(
467 result.is_ok(),
468 "expected empty body to pass with failOnNullBody=false"
469 );
470 }
471
472 #[tokio::test]
473 async fn test_header_name_validation_uses_header_value() {
474 let f = json_schema_file();
475 let uri = format!("validator:{}?headerName=X-Data", f.path().display());
476 let ep = ValidatorComponent::new()
477 .create_endpoint(&uri, &NoOpComponentContext)
478 .unwrap();
479 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
480 let mut msg = Message::new(camel_component_api::Body::Empty);
481 msg.set_header("X-Data", serde_json::json!({"id": "1"}).to_string());
482 let exchange = Exchange::new(msg);
483 let result = producer.oneshot(exchange).await;
484 assert!(
486 result.is_ok(),
487 "expected valid header to pass: {:?}",
488 result
489 );
490 }
491
492 #[tokio::test]
493 async fn test_header_name_missing_header_fails_by_default() {
494 let f = json_schema_file();
495 let uri = format!("validator:{}?headerName=X-Missing", f.path().display());
496 let ep = ValidatorComponent::new()
497 .create_endpoint(&uri, &NoOpComponentContext)
498 .unwrap();
499 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
500 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
501 let result = producer.oneshot(exchange).await;
502 assert!(result.is_err());
503 let msg = result.unwrap_err().to_string();
504 assert!(
505 msg.contains("X-Missing"),
506 "expected header name in error, got: {msg}"
507 );
508 }
509
510 #[tokio::test]
511 async fn test_fail_on_null_header_false_passes_missing_header() {
512 let f = json_schema_file();
513 let uri = format!(
514 "validator:{}?headerName=X-Missing&failOnNullHeader=false",
515 f.path().display()
516 );
517 let ep = ValidatorComponent::new()
518 .create_endpoint(&uri, &NoOpComponentContext)
519 .unwrap();
520 let producer = ep.create_producer(rt(), &ProducerContext::new()).unwrap();
521 let exchange = Exchange::new(Message::new(camel_component_api::Body::Empty));
522 let result = producer.oneshot(exchange).await;
523 assert!(
524 result.is_ok(),
525 "expected missing header to pass with failOnNullHeader=false"
526 );
527 }
528
529 #[test]
530 fn test_relaxng_schema_type_rejected_at_creation() {
531 let mut f = tempfile::Builder::new().suffix(".rng").tempfile().unwrap();
532 use std::io::Write;
533 f.write_all(b"<grammar/>").unwrap();
534 let uri = format!("validator:{}", f.path().display());
535 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
536 let err = result.err().expect("expected endpoint creation to fail");
537 let msg = err.to_string();
538 assert!(
539 msg.contains("not yet supported"),
540 "expected 'not yet supported' in error, got: {msg}"
541 );
542 }
543
544 #[test]
545 fn test_schematron_schema_type_rejected_at_creation() {
546 let mut f = tempfile::Builder::new().suffix(".sch").tempfile().unwrap();
547 use std::io::Write;
548 f.write_all(b"<schema/>").unwrap();
549 let uri = format!("validator:{}", f.path().display());
550 let result = ValidatorComponent::new().create_endpoint(&uri, &NoOpComponentContext);
551 let err = result.err().expect("expected endpoint creation to fail");
552 let msg = err.to_string();
553 assert!(
554 msg.contains("not yet supported"),
555 "expected 'not yet supported' in error, got: {msg}"
556 );
557 }
558
559 #[test]
560 fn test_schema_info_returns_description() {
561 let f = json_schema_file();
562 let uri = format!("validator:{}", f.path().display());
563 let ep = ValidatorComponent::new()
564 .create_endpoint(&uri, &NoOpComponentContext)
565 .unwrap();
566 assert_eq!(ep.uri(), uri);
569 }
570}