camel_component_validator/
component.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use crate::xsd_bridge::{XsdBridge, XsdBridgeBackend};
12use camel_component_api::ComponentContext;
13use camel_component_api::{
14 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
15};
16
17pub struct ValidatorComponent {
18 xsd_bridge: Arc<dyn XsdBridge>,
19}
20
21impl ValidatorComponent {
22 pub fn new() -> Self {
23 Self {
24 xsd_bridge: Arc::new(XsdBridgeBackend::new()),
25 }
26 }
27
28 #[cfg(test)]
29 fn with_xsd_bridge(xsd_bridge: Arc<dyn XsdBridge>) -> Self {
30 Self { xsd_bridge }
31 }
32}
33
34impl Default for ValidatorComponent {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl Component for ValidatorComponent {
41 fn scheme(&self) -> &str {
42 "validator"
43 }
44
45 fn create_endpoint(
46 &self,
47 uri: &str,
48 _ctx: &dyn ComponentContext,
49 ) -> Result<Box<dyn Endpoint>, CamelError> {
50 let config = ValidatorConfig::from_uri(uri)?;
51 let xsd_bridge = Arc::clone(&self.xsd_bridge);
52 let compiled = CompiledValidator::compile(&config, xsd_bridge)?;
53 Ok(Box::new(ValidatorEndpoint {
54 uri: uri.to_string(),
55 compiled: Arc::new(compiled),
56 }))
57 }
58}
59
60struct ValidatorEndpoint {
61 uri: String,
62 compiled: Arc<CompiledValidator>,
63}
64
65impl Endpoint for ValidatorEndpoint {
66 fn uri(&self) -> &str {
67 &self.uri
68 }
69
70 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
71 Err(CamelError::EndpointCreationFailed(
72 "validator endpoint does not support consumers".to_string(),
73 ))
74 }
75
76 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
77 Ok(BoxProcessor::new(ValidatorProducer {
78 uri: self.uri.clone(),
79 compiled: Arc::clone(&self.compiled),
80 }))
81 }
82}
83
84#[derive(Clone)]
85struct ValidatorProducer {
86 uri: String,
87 compiled: Arc<CompiledValidator>,
88}
89
90impl Service<Exchange> for ValidatorProducer {
91 type Response = Exchange;
92 type Error = CamelError;
93 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
94
95 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
96 Poll::Ready(Ok(()))
97 }
98
99 fn call(&mut self, exchange: Exchange) -> Self::Future {
100 let compiled = Arc::clone(&self.compiled);
101 let uri = self.uri.clone();
102 Box::pin(async move {
103 debug!(uri = uri, "validating exchange body");
104 compiled.validate(&exchange.input.body).await?;
105 Ok(exchange)
106 })
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::error::ValidatorError;
114 use async_trait::async_trait;
115 use camel_component_api::{Message, NoOpComponentContext};
116 use std::io::Write;
117 use std::sync::atomic::{AtomicUsize, Ordering};
118 use tower::ServiceExt;
119
120 fn json_schema_file() -> tempfile::NamedTempFile {
121 let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
122 f.write_all(br#"{"type":"object","required":["id"]}"#)
123 .unwrap();
124 f
125 }
126
127 fn xsd_file() -> tempfile::NamedTempFile {
128 let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
129 f.write_all(
130 br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
131 ).unwrap();
132 f
133 }
134
135 #[derive(Debug)]
136 struct MockXsdBridge {
137 register_calls: AtomicUsize,
138 validate_calls: AtomicUsize,
139 register_error: Option<ValidatorError>,
140 validate_error: Option<ValidatorError>,
141 }
142
143 #[async_trait]
144 impl XsdBridge for MockXsdBridge {
145 async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
146 self.register_calls.fetch_add(1, Ordering::SeqCst);
147 if let Some(err) = &self.register_error {
148 return Err(err.clone());
149 }
150 Ok("xsd-mock-id".to_string())
151 }
152
153 async fn validate(
154 &self,
155 _schema_id: &str,
156 _doc_bytes: Vec<u8>,
157 ) -> Result<(), ValidatorError> {
158 self.validate_calls.fetch_add(1, Ordering::SeqCst);
159 if let Some(err) = &self.validate_error {
160 return Err(err.clone());
161 }
162 Ok(())
163 }
164 }
165
166 #[test]
167 fn scheme_is_validator() {
168 assert_eq!(ValidatorComponent::new().scheme(), "validator");
169 }
170
171 #[test]
172 fn consumer_not_supported() {
173 let f = json_schema_file();
174 let uri = format!("validator:{}", f.path().display());
175 let ep = ValidatorComponent::new()
176 .create_endpoint(&uri, &NoOpComponentContext)
177 .unwrap();
178 assert!(ep.create_consumer().is_err());
179 }
180
181 #[test]
182 fn endpoint_creation_fails_for_missing_schema() {
183 let result = ValidatorComponent::new()
184 .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
185 assert!(result.is_err());
186 }
187
188 #[tokio::test]
189 async fn valid_json_body_passes_through() {
190 let f = json_schema_file();
191 let uri = format!("validator:{}", f.path().display());
192 let ep = ValidatorComponent::new()
193 .create_endpoint(&uri, &NoOpComponentContext)
194 .unwrap();
195 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
196 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
197 serde_json::json!({"id": "1"}),
198 )));
199 let result = producer.oneshot(exchange).await;
200 assert!(result.is_ok());
201 }
202
203 #[tokio::test]
204 async fn invalid_json_body_returns_err() {
205 let f = json_schema_file();
206 let uri = format!("validator:{}", f.path().display());
207 let ep = ValidatorComponent::new()
208 .create_endpoint(&uri, &NoOpComponentContext)
209 .unwrap();
210 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
211 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
212 serde_json::json!({"name": "x"}),
213 )));
214 let result = producer.oneshot(exchange).await;
215 assert!(result.is_err());
216 let msg = result.unwrap_err().to_string();
217 assert!(msg.contains("validation failed"), "got: {msg}");
218 }
219
220 #[tokio::test]
221 async fn valid_xml_body_passes() {
222 let backend = Arc::new(MockXsdBridge {
223 register_calls: AtomicUsize::new(0),
224 validate_calls: AtomicUsize::new(0),
225 register_error: None,
226 validate_error: None,
227 });
228 let f = xsd_file();
229 let uri = format!("validator:{}", f.path().display());
230 let ep = ValidatorComponent::with_xsd_bridge(backend)
231 .create_endpoint(&uri, &NoOpComponentContext)
232 .unwrap();
233 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
234 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
235 "<order>hello</order>".to_string(),
236 )));
237 assert!(producer.oneshot(exchange).await.is_ok());
238 }
239
240 #[tokio::test]
241 async fn xsd_bridge_register_and_validate_mock() {
242 let backend = Arc::new(MockXsdBridge {
243 register_calls: AtomicUsize::new(0),
244 validate_calls: AtomicUsize::new(0),
245 register_error: None,
246 validate_error: None,
247 });
248
249 let f = xsd_file();
250 let uri = format!("validator:{}", f.path().display());
251 let ep = ValidatorComponent::with_xsd_bridge(Arc::clone(&backend) as Arc<dyn XsdBridge>)
252 .create_endpoint(&uri, &NoOpComponentContext)
253 .unwrap();
254
255 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
256 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
257 "<order>ok</order>".to_string(),
258 )));
259 assert!(producer.oneshot(exchange).await.is_ok());
260 assert_eq!(backend.register_calls.load(Ordering::SeqCst), 1);
261 assert_eq!(backend.validate_calls.load(Ordering::SeqCst), 1);
262 }
263
264 #[tokio::test]
265 async fn xsd_bridge_register_error_propagates_on_validate() {
266 let backend = Arc::new(MockXsdBridge {
267 register_calls: AtomicUsize::new(0),
268 validate_calls: AtomicUsize::new(0),
269 register_error: Some(ValidatorError::CompilationFailed(
270 "COMPILATION_FAILED".to_string(),
271 )),
272 validate_error: None,
273 });
274 let f = xsd_file();
275 let uri = format!("validator:{}", f.path().display());
276 let ep = ValidatorComponent::with_xsd_bridge(backend)
278 .create_endpoint(&uri, &NoOpComponentContext)
279 .expect("endpoint creation should succeed");
280 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
282 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
283 "<order/>".to_string(),
284 )));
285 let err = producer
286 .oneshot(exchange)
287 .await
288 .expect_err("expected validate to fail due to registration error");
289 assert!(err.to_string().contains("COMPILATION_FAILED"));
290 }
291}