Skip to main content

camel_component_validator/
compiled.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_component_api::{Body, CamelError};
6use libxml::parser::Parser as XmlParser;
7use libxml::schemas::{SchemaParserContext, SchemaValidationContext};
8use serde_yml::Value as YamlValue;
9
10use crate::config::{SchemaType, ValidatorConfig};
11
12pub(crate) struct SendSchemaValidationContext(SchemaValidationContext);
13
14// SAFETY: SchemaValidationContext wraps libxml2's xmlSchemaValidCtxt which is
15// NOT thread-safe (requires &mut self for validate_document). However, it is
16// safe to move between threads because it owns its heap allocation and has no
17// thread-affinity (no thread-local state, no TLS, no OS handles tied to a thread).
18// Concurrent access is prevented by wrapping in Mutex<Arc<...>>, so only one
19// thread can call validate_document at a time. This matches the pattern used by
20// other Rust XML libraries wrapping libxml2 (e.g., the libxml crate's own tests).
21unsafe impl Send for SendSchemaValidationContext {}
22
23pub(crate) enum CompiledValidator {
24    Xml(Arc<Mutex<SendSchemaValidationContext>>),
25    Json(Arc<jsonschema::Validator>),
26    Yaml(Arc<jsonschema::Validator>),
27}
28
29impl CompiledValidator {
30    pub fn compile(config: &ValidatorConfig) -> Result<Self, CamelError> {
31        let path = &config.schema_path;
32
33        let content = std::fs::read(path).map_err(|e| {
34            CamelError::EndpointCreationFailed(format!(
35                "failed to read schema file '{}': {e}",
36                path.display()
37            ))
38        })?;
39
40        match config.schema_type {
41            SchemaType::Xml => Self::compile_xsd(path, &content),
42            SchemaType::Json => Self::compile_json(&content, path),
43            SchemaType::Yaml => Self::compile_yaml_schema(&content, path),
44        }
45    }
46
47    fn compile_xsd(path: &Path, content: &[u8]) -> Result<Self, CamelError> {
48        let mut parser_ctx = SchemaParserContext::from_buffer(content);
49        let ctx = SchemaValidationContext::from_parser(&mut parser_ctx).map_err(|errors| {
50            let msgs: Vec<String> = errors
51                .iter()
52                .map(|e| e.message.as_deref().unwrap_or("").to_string())
53                .collect();
54            CamelError::EndpointCreationFailed(format!(
55                "invalid XSD schema '{}': {}",
56                path.display(),
57                msgs.join("; ")
58            ))
59        })?;
60
61        Ok(CompiledValidator::Xml(Arc::new(Mutex::new(
62            SendSchemaValidationContext(ctx),
63        ))))
64    }
65
66    fn compile_json(content: &[u8], path: &Path) -> Result<Self, CamelError> {
67        let schema_value: serde_json::Value = serde_json::from_slice(content).map_err(|e| {
68            CamelError::EndpointCreationFailed(format!(
69                "invalid JSON schema '{}': {e}",
70                path.display()
71            ))
72        })?;
73
74        let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
75            CamelError::EndpointCreationFailed(format!(
76                "failed to compile JSON schema '{}': {e}",
77                path.display()
78            ))
79        })?;
80
81        Ok(CompiledValidator::Json(Arc::new(validator)))
82    }
83
84    fn compile_yaml_schema(content: &[u8], path: &Path) -> Result<Self, CamelError> {
85        let yaml_str = std::str::from_utf8(content).map_err(|e| {
86            CamelError::EndpointCreationFailed(format!(
87                "YAML schema '{}' is not valid UTF-8: {e}",
88                path.display()
89            ))
90        })?;
91
92        let yaml_value: YamlValue = serde_yml::from_str(yaml_str).map_err(|e| {
93            CamelError::EndpointCreationFailed(format!(
94                "invalid YAML schema '{}': {e}",
95                path.display()
96            ))
97        })?;
98
99        let schema_value: serde_json::Value = serde_json::to_value(&yaml_value).map_err(|e| {
100            CamelError::EndpointCreationFailed(format!(
101                "failed to convert YAML schema to JSON '{}': {e}",
102                path.display()
103            ))
104        })?;
105
106        let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
107            CamelError::EndpointCreationFailed(format!(
108                "failed to compile YAML schema '{}': {e}",
109                path.display()
110            ))
111        })?;
112
113        Ok(CompiledValidator::Yaml(Arc::new(validator)))
114    }
115
116    pub fn validate(&self, body: &Body) -> Result<(), CamelError> {
117        match self {
118            CompiledValidator::Xml(ctx) => Self::validate_xml(ctx, body),
119            CompiledValidator::Json(validator) => Self::validate_json(validator, body),
120            CompiledValidator::Yaml(validator) => Self::validate_yaml(validator, body),
121        }
122    }
123
124    fn validate_xml(
125        ctx: &Arc<Mutex<SendSchemaValidationContext>>,
126        body: &Body,
127    ) -> Result<(), CamelError> {
128        let xml_str: std::borrow::Cow<str> = match body {
129            Body::Xml(s) => std::borrow::Cow::Borrowed(s.as_str()),
130            Body::Text(s) => std::borrow::Cow::Borrowed(s.as_str()),
131            Body::Bytes(b) => {
132                std::borrow::Cow::Owned(String::from_utf8(b.to_vec()).map_err(|e| {
133                    CamelError::ProcessorError(format!("XSD validator: invalid UTF-8 in body: {e}"))
134                })?)
135            }
136            _ => {
137                return Err(CamelError::ProcessorError(
138                    "XSD validator requires Body::Xml, Body::Text, or Body::Bytes".to_string(),
139                ));
140            }
141        };
142
143        let parser = XmlParser::default();
144        let doc = parser.parse_string(xml_str.as_bytes()).map_err(|e| {
145            CamelError::ProcessorError(format!("XML parse error during validation: {e}"))
146        })?;
147
148        let mut guard = ctx
149            .lock()
150            .map_err(|e| CamelError::ProcessorError(format!("XSD validator lock poisoned: {e}")))?;
151        let result = guard.0.validate_document(&doc);
152        match result {
153            Ok(()) => Ok(()),
154            Err(errors) => {
155                let messages: Vec<String> = errors
156                    .iter()
157                    .map(|e| e.message.as_deref().unwrap_or("").to_string())
158                    .collect();
159                Err(CamelError::ProcessorError(format!(
160                    "XSD validation failed:\n{}",
161                    messages.join("\n")
162                )))
163            }
164        }
165    }
166
167    fn validate_json(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
168        let json_value = match body {
169            Body::Json(v) => v.clone(),
170            Body::Text(s) => serde_json::from_str(s)
171                .map_err(|e| CamelError::ProcessorError(format!("body is not valid JSON: {e}")))?,
172            Body::Bytes(b) => serde_json::from_slice(b).map_err(|e| {
173                CamelError::ProcessorError(format!("body bytes are not valid JSON: {e}"))
174            })?,
175            _ => {
176                return Err(CamelError::ProcessorError(
177                    "JSON Schema validator requires Body::Json, Body::Text, or Body::Bytes"
178                        .to_string(),
179                ));
180            }
181        };
182
183        let messages: Vec<String> = validator
184            .iter_errors(&json_value)
185            .map(|e| format!("{e} at {}", e.instance_path()))
186            .collect();
187
188        if messages.is_empty() {
189            Ok(())
190        } else {
191            Err(CamelError::ProcessorError(format!(
192                "JSON Schema validation failed:\n{}",
193                messages.join("\n")
194            )))
195        }
196    }
197
198    fn validate_yaml(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
199        let yaml_str = match body {
200            Body::Text(s) => s.as_str(),
201            _ => {
202                return Err(CamelError::ProcessorError(
203                    "YAML validator requires a text body (Body::Text)".to_string(),
204                ));
205            }
206        };
207
208        let yaml_value: serde_yml::Value = serde_yml::from_str(yaml_str)
209            .map_err(|e| CamelError::ProcessorError(format!("body is not valid YAML: {e}")))?;
210
211        let json_value: serde_json::Value = serde_json::to_value(&yaml_value)
212            .map_err(|e| CamelError::ProcessorError(format!("YAML→JSON conversion failed: {e}")))?;
213
214        let messages: Vec<String> = validator
215            .iter_errors(&json_value)
216            .map(|e| format!("{e} at {}", e.instance_path()))
217            .collect();
218
219        if messages.is_empty() {
220            Ok(())
221        } else {
222            Err(CamelError::ProcessorError(format!(
223                "YAML Schema validation failed:\n{}",
224                messages.join("\n")
225            )))
226        }
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::config::{SchemaType, ValidatorConfig};
234    use std::io::Write;
235    use tempfile::NamedTempFile;
236
237    fn write_temp(content: &str, suffix: &str) -> NamedTempFile {
238        let mut f = tempfile::Builder::new().suffix(suffix).tempfile().unwrap();
239        f.write_all(content.as_bytes()).unwrap();
240        f
241    }
242
243    #[test]
244    fn compiled_validator_is_send_sync() {
245        fn assert_send_sync<T: Send + Sync>() {}
246        assert_send_sync::<CompiledValidator>();
247    }
248
249    #[test]
250    fn xsd_compile_missing_file_errors() {
251        let config = ValidatorConfig {
252            schema_path: "/nonexistent/schema.xsd".into(),
253            schema_type: SchemaType::Xml,
254        };
255        assert!(CompiledValidator::compile(&config).is_err());
256    }
257
258    #[test]
259    fn xsd_valid_xml_passes() {
260        let xsd = r#"<?xml version="1.0"?>
261<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
262  <xs:element name="order">
263    <xs:complexType>
264      <xs:sequence>
265        <xs:element name="id" type="xs:string"/>
266      </xs:sequence>
267    </xs:complexType>
268  </xs:element>
269</xs:schema>"#;
270        let f = write_temp(xsd, ".xsd");
271        let config = ValidatorConfig {
272            schema_path: f.path().to_path_buf(),
273            schema_type: SchemaType::Xml,
274        };
275        let compiled = CompiledValidator::compile(&config).unwrap();
276        let body = Body::Xml("<order><id>123</id></order>".to_string());
277        assert!(compiled.validate(&body).is_ok());
278    }
279
280    #[test]
281    fn xsd_invalid_xml_returns_all_errors() {
282        let xsd = r#"<?xml version="1.0"?>
283<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
284  <xs:element name="order">
285    <xs:complexType>
286      <xs:sequence>
287        <xs:element name="id" type="xs:string"/>
288        <xs:element name="amount" type="xs:integer"/>
289      </xs:sequence>
290    </xs:complexType>
291  </xs:element>
292</xs:schema>"#;
293        let f = write_temp(xsd, ".xsd");
294        let config = ValidatorConfig {
295            schema_path: f.path().to_path_buf(),
296            schema_type: SchemaType::Xml,
297        };
298        let compiled = CompiledValidator::compile(&config).unwrap();
299        let body = Body::Xml("<order/>".to_string());
300        let err = compiled.validate(&body).unwrap_err();
301        let msg = err.to_string();
302        assert!(msg.contains("validation failed"), "got: {msg}");
303    }
304
305    #[test]
306    fn xsd_detects_missing_required_attribute() {
307        let xsd = r#"<?xml version="1.0"?>
308<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
309  <xs:element name="order">
310    <xs:complexType>
311      <xs:sequence>
312        <xs:element name="item">
313          <xs:complexType>
314            <xs:attribute name="id" type="xs:string" use="required"/>
315          </xs:complexType>
316        </xs:element>
317      </xs:sequence>
318    </xs:complexType>
319  </xs:element>
320</xs:schema>"#;
321        let f = write_temp(xsd, ".xsd");
322        let config = ValidatorConfig {
323            schema_path: f.path().to_path_buf(),
324            schema_type: SchemaType::Xml,
325        };
326        let compiled = CompiledValidator::compile(&config).unwrap();
327        let body = Body::Xml("<order><item/></order>".to_string());
328        let err = compiled.validate(&body).unwrap_err();
329        assert!(
330            err.to_string().contains("validation failed"),
331            "libxml2 should detect missing required attr"
332        );
333    }
334
335    #[test]
336    fn xsd_detects_wrong_simple_type() {
337        let xsd = r#"<?xml version="1.0"?>
338<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
339  <xs:element name="root">
340    <xs:complexType>
341      <xs:sequence>
342        <xs:element name="count" type="xs:positiveInteger"/>
343      </xs:sequence>
344    </xs:complexType>
345  </xs:element>
346</xs:schema>"#;
347        let f = write_temp(xsd, ".xsd");
348        let config = ValidatorConfig {
349            schema_path: f.path().to_path_buf(),
350            schema_type: SchemaType::Xml,
351        };
352        let compiled = CompiledValidator::compile(&config).unwrap();
353        let body = Body::Xml("<root><count>-5</count></root>".to_string());
354        assert!(
355            compiled.validate(&body).is_err(),
356            "libxml2 should detect negative positiveInteger"
357        );
358    }
359
360    #[test]
361    fn json_valid_passes() {
362        let schema =
363            r#"{"type":"object","required":["name"],"properties":{"name":{"type":"string"}}}"#;
364        let f = write_temp(schema, ".json");
365        let config = ValidatorConfig {
366            schema_path: f.path().to_path_buf(),
367            schema_type: SchemaType::Json,
368        };
369        let compiled = CompiledValidator::compile(&config).unwrap();
370        let body = Body::Json(serde_json::json!({"name": "Alice"}));
371        assert!(compiled.validate(&body).is_ok());
372    }
373
374    #[test]
375    fn json_invalid_returns_errors() {
376        let schema = r#"{"type":"object","required":["name"]}"#;
377        let f = write_temp(schema, ".json");
378        let config = ValidatorConfig {
379            schema_path: f.path().to_path_buf(),
380            schema_type: SchemaType::Json,
381        };
382        let compiled = CompiledValidator::compile(&config).unwrap();
383        let body = Body::Json(serde_json::json!({"age": 30}));
384        let err = compiled.validate(&body).unwrap_err();
385        assert!(err.to_string().contains("validation failed"));
386    }
387
388    #[test]
389    fn json_text_body_parses_and_validates() {
390        let schema = r#"{"type":"string"}"#;
391        let f = write_temp(schema, ".json");
392        let config = ValidatorConfig {
393            schema_path: f.path().to_path_buf(),
394            schema_type: SchemaType::Json,
395        };
396        let compiled = CompiledValidator::compile(&config).unwrap();
397        let body = Body::Text(r#""hello""#.to_string());
398        assert!(compiled.validate(&body).is_ok());
399    }
400
401    #[test]
402    fn json_empty_body_errors() {
403        let schema = r#"{"type":"object"}"#;
404        let f = write_temp(schema, ".json");
405        let config = ValidatorConfig {
406            schema_path: f.path().to_path_buf(),
407            schema_type: SchemaType::Json,
408        };
409        let compiled = CompiledValidator::compile(&config).unwrap();
410        let body = Body::Empty;
411        assert!(compiled.validate(&body).is_err());
412    }
413
414    #[test]
415    fn yaml_valid_passes() {
416        let schema = "type: object\nrequired: [host]\nproperties:\n  host:\n    type: string\n";
417        let f = write_temp(schema, ".yaml");
418        let config = ValidatorConfig {
419            schema_path: f.path().to_path_buf(),
420            schema_type: SchemaType::Yaml,
421        };
422        let compiled = CompiledValidator::compile(&config).unwrap();
423        let body = Body::Text("host: localhost\n".to_string());
424        assert!(compiled.validate(&body).is_ok());
425    }
426
427    #[test]
428    fn yaml_invalid_returns_errors() {
429        let schema = "type: object\nrequired: [host]\n";
430        let f = write_temp(schema, ".yaml");
431        let config = ValidatorConfig {
432            schema_path: f.path().to_path_buf(),
433            schema_type: SchemaType::Yaml,
434        };
435        let compiled = CompiledValidator::compile(&config).unwrap();
436        let body = Body::Text("port: 8080\n".to_string());
437        let err = compiled.validate(&body).unwrap_err();
438        assert!(err.to_string().contains("validation failed"));
439    }
440
441    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
442    async fn concurrent_xsd_validation_stress_test() {
443        let xsd = r#"<?xml version="1.0"?>
444<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
445  <xs:element name="order">
446    <xs:complexType>
447      <xs:sequence>
448        <xs:element name="id" type="xs:string"/>
449      </xs:sequence>
450    </xs:complexType>
451  </xs:element>
452</xs:schema>"#;
453        let f = write_temp(xsd, ".xsd");
454        let config = ValidatorConfig {
455            schema_path: f.path().to_path_buf(),
456            schema_type: SchemaType::Xml,
457        };
458        let compiled = Arc::new(CompiledValidator::compile(&config).unwrap());
459
460        let mut handles = Vec::new();
461        for i in 0..20 {
462            let c = Arc::clone(&compiled);
463            handles.push(tokio::spawn(async move {
464                let body = if i % 2 == 0 {
465                    Body::Xml("<order><id>123</id></order>".to_string())
466                } else {
467                    Body::Xml("<order/>".to_string())
468                };
469                c.validate(&body)
470            }));
471        }
472
473        let mut valid = 0;
474        let mut invalid = 0;
475        for h in handles {
476            match h.await.unwrap() {
477                Ok(()) => valid += 1,
478                Err(_) => invalid += 1,
479            }
480        }
481        assert_eq!(valid, 10);
482        assert_eq!(invalid, 10);
483    }
484}