camel_component_validator/
compiled.rs1use std::path::Path;
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_component_api::{Body, CamelError};
6use libxml::parser::Parser as XmlParser;
7use libxml::schemas::{SchemaParserContext, SchemaValidationContext};
8use serde_yml::Value as YamlValue;
9
10use crate::config::{SchemaType, ValidatorConfig};
11
12pub(crate) struct SendSchemaValidationContext(SchemaValidationContext);
13
14unsafe impl Send for SendSchemaValidationContext {}
22
23pub(crate) enum CompiledValidator {
24 Xml(Arc<Mutex<SendSchemaValidationContext>>),
25 Json(Arc<jsonschema::Validator>),
26 Yaml(Arc<jsonschema::Validator>),
27}
28
29impl CompiledValidator {
30 pub fn compile(config: &ValidatorConfig) -> Result<Self, CamelError> {
31 let path = &config.schema_path;
32
33 let content = std::fs::read(path).map_err(|e| {
34 CamelError::EndpointCreationFailed(format!(
35 "failed to read schema file '{}': {e}",
36 path.display()
37 ))
38 })?;
39
40 match config.schema_type {
41 SchemaType::Xml => Self::compile_xsd(path, &content),
42 SchemaType::Json => Self::compile_json(&content, path),
43 SchemaType::Yaml => Self::compile_yaml_schema(&content, path),
44 }
45 }
46
47 fn compile_xsd(path: &Path, content: &[u8]) -> Result<Self, CamelError> {
48 let mut parser_ctx = SchemaParserContext::from_buffer(content);
49 let ctx = SchemaValidationContext::from_parser(&mut parser_ctx).map_err(|errors| {
50 let msgs: Vec<String> = errors
51 .iter()
52 .map(|e| e.message.as_deref().unwrap_or("").to_string())
53 .collect();
54 CamelError::EndpointCreationFailed(format!(
55 "invalid XSD schema '{}': {}",
56 path.display(),
57 msgs.join("; ")
58 ))
59 })?;
60
61 Ok(CompiledValidator::Xml(Arc::new(Mutex::new(
62 SendSchemaValidationContext(ctx),
63 ))))
64 }
65
66 fn compile_json(content: &[u8], path: &Path) -> Result<Self, CamelError> {
67 let schema_value: serde_json::Value = serde_json::from_slice(content).map_err(|e| {
68 CamelError::EndpointCreationFailed(format!(
69 "invalid JSON schema '{}': {e}",
70 path.display()
71 ))
72 })?;
73
74 let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
75 CamelError::EndpointCreationFailed(format!(
76 "failed to compile JSON schema '{}': {e}",
77 path.display()
78 ))
79 })?;
80
81 Ok(CompiledValidator::Json(Arc::new(validator)))
82 }
83
84 fn compile_yaml_schema(content: &[u8], path: &Path) -> Result<Self, CamelError> {
85 let yaml_str = std::str::from_utf8(content).map_err(|e| {
86 CamelError::EndpointCreationFailed(format!(
87 "YAML schema '{}' is not valid UTF-8: {e}",
88 path.display()
89 ))
90 })?;
91
92 let yaml_value: YamlValue = serde_yml::from_str(yaml_str).map_err(|e| {
93 CamelError::EndpointCreationFailed(format!(
94 "invalid YAML schema '{}': {e}",
95 path.display()
96 ))
97 })?;
98
99 let schema_value: serde_json::Value = serde_json::to_value(&yaml_value).map_err(|e| {
100 CamelError::EndpointCreationFailed(format!(
101 "failed to convert YAML schema to JSON '{}': {e}",
102 path.display()
103 ))
104 })?;
105
106 let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
107 CamelError::EndpointCreationFailed(format!(
108 "failed to compile YAML schema '{}': {e}",
109 path.display()
110 ))
111 })?;
112
113 Ok(CompiledValidator::Yaml(Arc::new(validator)))
114 }
115
116 pub fn validate(&self, body: &Body) -> Result<(), CamelError> {
117 match self {
118 CompiledValidator::Xml(ctx) => Self::validate_xml(ctx, body),
119 CompiledValidator::Json(validator) => Self::validate_json(validator, body),
120 CompiledValidator::Yaml(validator) => Self::validate_yaml(validator, body),
121 }
122 }
123
124 fn validate_xml(
125 ctx: &Arc<Mutex<SendSchemaValidationContext>>,
126 body: &Body,
127 ) -> Result<(), CamelError> {
128 let xml_str: std::borrow::Cow<str> = match body {
129 Body::Xml(s) => std::borrow::Cow::Borrowed(s.as_str()),
130 Body::Text(s) => std::borrow::Cow::Borrowed(s.as_str()),
131 Body::Bytes(b) => {
132 std::borrow::Cow::Owned(String::from_utf8(b.to_vec()).map_err(|e| {
133 CamelError::ProcessorError(format!("XSD validator: invalid UTF-8 in body: {e}"))
134 })?)
135 }
136 _ => {
137 return Err(CamelError::ProcessorError(
138 "XSD validator requires Body::Xml, Body::Text, or Body::Bytes".to_string(),
139 ));
140 }
141 };
142
143 let parser = XmlParser::default();
144 let doc = parser.parse_string(xml_str.as_bytes()).map_err(|e| {
145 CamelError::ProcessorError(format!("XML parse error during validation: {e}"))
146 })?;
147
148 let mut guard = ctx
149 .lock()
150 .map_err(|e| CamelError::ProcessorError(format!("XSD validator lock poisoned: {e}")))?;
151 let result = guard.0.validate_document(&doc);
152 match result {
153 Ok(()) => Ok(()),
154 Err(errors) => {
155 let messages: Vec<String> = errors
156 .iter()
157 .map(|e| e.message.as_deref().unwrap_or("").to_string())
158 .collect();
159 Err(CamelError::ProcessorError(format!(
160 "XSD validation failed:\n{}",
161 messages.join("\n")
162 )))
163 }
164 }
165 }
166
167 fn validate_json(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
168 let json_value = match body {
169 Body::Json(v) => v.clone(),
170 Body::Text(s) => serde_json::from_str(s)
171 .map_err(|e| CamelError::ProcessorError(format!("body is not valid JSON: {e}")))?,
172 Body::Bytes(b) => serde_json::from_slice(b).map_err(|e| {
173 CamelError::ProcessorError(format!("body bytes are not valid JSON: {e}"))
174 })?,
175 _ => {
176 return Err(CamelError::ProcessorError(
177 "JSON Schema validator requires Body::Json, Body::Text, or Body::Bytes"
178 .to_string(),
179 ));
180 }
181 };
182
183 let messages: Vec<String> = validator
184 .iter_errors(&json_value)
185 .map(|e| format!("{e} at {}", e.instance_path()))
186 .collect();
187
188 if messages.is_empty() {
189 Ok(())
190 } else {
191 Err(CamelError::ProcessorError(format!(
192 "JSON Schema validation failed:\n{}",
193 messages.join("\n")
194 )))
195 }
196 }
197
198 fn validate_yaml(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
199 let yaml_str = match body {
200 Body::Text(s) => s.as_str(),
201 _ => {
202 return Err(CamelError::ProcessorError(
203 "YAML validator requires a text body (Body::Text)".to_string(),
204 ));
205 }
206 };
207
208 let yaml_value: serde_yml::Value = serde_yml::from_str(yaml_str)
209 .map_err(|e| CamelError::ProcessorError(format!("body is not valid YAML: {e}")))?;
210
211 let json_value: serde_json::Value = serde_json::to_value(&yaml_value)
212 .map_err(|e| CamelError::ProcessorError(format!("YAML→JSON conversion failed: {e}")))?;
213
214 let messages: Vec<String> = validator
215 .iter_errors(&json_value)
216 .map(|e| format!("{e} at {}", e.instance_path()))
217 .collect();
218
219 if messages.is_empty() {
220 Ok(())
221 } else {
222 Err(CamelError::ProcessorError(format!(
223 "YAML Schema validation failed:\n{}",
224 messages.join("\n")
225 )))
226 }
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::config::{SchemaType, ValidatorConfig};
234 use std::io::Write;
235 use tempfile::NamedTempFile;
236
237 fn write_temp(content: &str, suffix: &str) -> NamedTempFile {
238 let mut f = tempfile::Builder::new().suffix(suffix).tempfile().unwrap();
239 f.write_all(content.as_bytes()).unwrap();
240 f
241 }
242
243 #[test]
244 fn compiled_validator_is_send_sync() {
245 fn assert_send_sync<T: Send + Sync>() {}
246 assert_send_sync::<CompiledValidator>();
247 }
248
249 #[test]
250 fn xsd_compile_missing_file_errors() {
251 let config = ValidatorConfig {
252 schema_path: "/nonexistent/schema.xsd".into(),
253 schema_type: SchemaType::Xml,
254 };
255 assert!(CompiledValidator::compile(&config).is_err());
256 }
257
258 #[test]
259 fn xsd_valid_xml_passes() {
260 let xsd = r#"<?xml version="1.0"?>
261<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
262 <xs:element name="order">
263 <xs:complexType>
264 <xs:sequence>
265 <xs:element name="id" type="xs:string"/>
266 </xs:sequence>
267 </xs:complexType>
268 </xs:element>
269</xs:schema>"#;
270 let f = write_temp(xsd, ".xsd");
271 let config = ValidatorConfig {
272 schema_path: f.path().to_path_buf(),
273 schema_type: SchemaType::Xml,
274 };
275 let compiled = CompiledValidator::compile(&config).unwrap();
276 let body = Body::Xml("<order><id>123</id></order>".to_string());
277 assert!(compiled.validate(&body).is_ok());
278 }
279
280 #[test]
281 fn xsd_invalid_xml_returns_all_errors() {
282 let xsd = r#"<?xml version="1.0"?>
283<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
284 <xs:element name="order">
285 <xs:complexType>
286 <xs:sequence>
287 <xs:element name="id" type="xs:string"/>
288 <xs:element name="amount" type="xs:integer"/>
289 </xs:sequence>
290 </xs:complexType>
291 </xs:element>
292</xs:schema>"#;
293 let f = write_temp(xsd, ".xsd");
294 let config = ValidatorConfig {
295 schema_path: f.path().to_path_buf(),
296 schema_type: SchemaType::Xml,
297 };
298 let compiled = CompiledValidator::compile(&config).unwrap();
299 let body = Body::Xml("<order/>".to_string());
300 let err = compiled.validate(&body).unwrap_err();
301 let msg = err.to_string();
302 assert!(msg.contains("validation failed"), "got: {msg}");
303 }
304
305 #[test]
306 fn xsd_detects_missing_required_attribute() {
307 let xsd = r#"<?xml version="1.0"?>
308<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
309 <xs:element name="order">
310 <xs:complexType>
311 <xs:sequence>
312 <xs:element name="item">
313 <xs:complexType>
314 <xs:attribute name="id" type="xs:string" use="required"/>
315 </xs:complexType>
316 </xs:element>
317 </xs:sequence>
318 </xs:complexType>
319 </xs:element>
320</xs:schema>"#;
321 let f = write_temp(xsd, ".xsd");
322 let config = ValidatorConfig {
323 schema_path: f.path().to_path_buf(),
324 schema_type: SchemaType::Xml,
325 };
326 let compiled = CompiledValidator::compile(&config).unwrap();
327 let body = Body::Xml("<order><item/></order>".to_string());
328 let err = compiled.validate(&body).unwrap_err();
329 assert!(
330 err.to_string().contains("validation failed"),
331 "libxml2 should detect missing required attr"
332 );
333 }
334
335 #[test]
336 fn xsd_detects_wrong_simple_type() {
337 let xsd = r#"<?xml version="1.0"?>
338<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
339 <xs:element name="root">
340 <xs:complexType>
341 <xs:sequence>
342 <xs:element name="count" type="xs:positiveInteger"/>
343 </xs:sequence>
344 </xs:complexType>
345 </xs:element>
346</xs:schema>"#;
347 let f = write_temp(xsd, ".xsd");
348 let config = ValidatorConfig {
349 schema_path: f.path().to_path_buf(),
350 schema_type: SchemaType::Xml,
351 };
352 let compiled = CompiledValidator::compile(&config).unwrap();
353 let body = Body::Xml("<root><count>-5</count></root>".to_string());
354 assert!(
355 compiled.validate(&body).is_err(),
356 "libxml2 should detect negative positiveInteger"
357 );
358 }
359
360 #[test]
361 fn json_valid_passes() {
362 let schema =
363 r#"{"type":"object","required":["name"],"properties":{"name":{"type":"string"}}}"#;
364 let f = write_temp(schema, ".json");
365 let config = ValidatorConfig {
366 schema_path: f.path().to_path_buf(),
367 schema_type: SchemaType::Json,
368 };
369 let compiled = CompiledValidator::compile(&config).unwrap();
370 let body = Body::Json(serde_json::json!({"name": "Alice"}));
371 assert!(compiled.validate(&body).is_ok());
372 }
373
374 #[test]
375 fn json_invalid_returns_errors() {
376 let schema = r#"{"type":"object","required":["name"]}"#;
377 let f = write_temp(schema, ".json");
378 let config = ValidatorConfig {
379 schema_path: f.path().to_path_buf(),
380 schema_type: SchemaType::Json,
381 };
382 let compiled = CompiledValidator::compile(&config).unwrap();
383 let body = Body::Json(serde_json::json!({"age": 30}));
384 let err = compiled.validate(&body).unwrap_err();
385 assert!(err.to_string().contains("validation failed"));
386 }
387
388 #[test]
389 fn json_text_body_parses_and_validates() {
390 let schema = r#"{"type":"string"}"#;
391 let f = write_temp(schema, ".json");
392 let config = ValidatorConfig {
393 schema_path: f.path().to_path_buf(),
394 schema_type: SchemaType::Json,
395 };
396 let compiled = CompiledValidator::compile(&config).unwrap();
397 let body = Body::Text(r#""hello""#.to_string());
398 assert!(compiled.validate(&body).is_ok());
399 }
400
401 #[test]
402 fn json_empty_body_errors() {
403 let schema = r#"{"type":"object"}"#;
404 let f = write_temp(schema, ".json");
405 let config = ValidatorConfig {
406 schema_path: f.path().to_path_buf(),
407 schema_type: SchemaType::Json,
408 };
409 let compiled = CompiledValidator::compile(&config).unwrap();
410 let body = Body::Empty;
411 assert!(compiled.validate(&body).is_err());
412 }
413
414 #[test]
415 fn yaml_valid_passes() {
416 let schema = "type: object\nrequired: [host]\nproperties:\n host:\n type: string\n";
417 let f = write_temp(schema, ".yaml");
418 let config = ValidatorConfig {
419 schema_path: f.path().to_path_buf(),
420 schema_type: SchemaType::Yaml,
421 };
422 let compiled = CompiledValidator::compile(&config).unwrap();
423 let body = Body::Text("host: localhost\n".to_string());
424 assert!(compiled.validate(&body).is_ok());
425 }
426
427 #[test]
428 fn yaml_invalid_returns_errors() {
429 let schema = "type: object\nrequired: [host]\n";
430 let f = write_temp(schema, ".yaml");
431 let config = ValidatorConfig {
432 schema_path: f.path().to_path_buf(),
433 schema_type: SchemaType::Yaml,
434 };
435 let compiled = CompiledValidator::compile(&config).unwrap();
436 let body = Body::Text("port: 8080\n".to_string());
437 let err = compiled.validate(&body).unwrap_err();
438 assert!(err.to_string().contains("validation failed"));
439 }
440
441 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
442 async fn concurrent_xsd_validation_stress_test() {
443 let xsd = r#"<?xml version="1.0"?>
444<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
445 <xs:element name="order">
446 <xs:complexType>
447 <xs:sequence>
448 <xs:element name="id" type="xs:string"/>
449 </xs:sequence>
450 </xs:complexType>
451 </xs:element>
452</xs:schema>"#;
453 let f = write_temp(xsd, ".xsd");
454 let config = ValidatorConfig {
455 schema_path: f.path().to_path_buf(),
456 schema_type: SchemaType::Xml,
457 };
458 let compiled = Arc::new(CompiledValidator::compile(&config).unwrap());
459
460 let mut handles = Vec::new();
461 for i in 0..20 {
462 let c = Arc::clone(&compiled);
463 handles.push(tokio::spawn(async move {
464 let body = if i % 2 == 0 {
465 Body::Xml("<order><id>123</id></order>".to_string())
466 } else {
467 Body::Xml("<order/>".to_string())
468 };
469 c.validate(&body)
470 }));
471 }
472
473 let mut valid = 0;
474 let mut invalid = 0;
475 for h in handles {
476 match h.await.unwrap() {
477 Ok(()) => valid += 1,
478 Err(_) => invalid += 1,
479 }
480 }
481 assert_eq!(valid, 10);
482 assert_eq!(invalid, 10);
483 }
484}