Skip to main content

camel_component_validator/
compiled.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use camel_component_api::{Body, CamelError};
5use serde_yml::Value as YamlValue;
6
7use crate::config::{SchemaType, ValidatorConfig};
8use crate::error::ValidatorError;
9use crate::resolver::{FilesystemResolver, ResourceResolver};
10use crate::xsd_bridge::XsdBridge;
11
12/// Returns an approximate byte-length for a [`Body`] variant.
13///
14/// This is a rough estimate used only for backpressure (rejecting obviously
15/// oversized payloads). `Body::Stream` is not measurable and returns `None`.
16pub(crate) fn approx_body_byte_len(body: &Body) -> Option<usize> {
17    match body {
18        Body::Empty => Some(0),
19        Body::Bytes(b) => Some(b.len()),
20        Body::Text(s) => Some(s.len()),
21        Body::Xml(s) => Some(s.len()),
22        Body::Json(v) => serde_json::to_vec(v).ok().map(|v| v.len()),
23        Body::Stream(_) => None,
24    }
25}
26
27pub(crate) enum CompiledValidator {
28    Xml {
29        /// Raw XSD bytes. `register` is called lazily on first validation so
30        /// bridge startup happens in an async context (avoiding runtime issues
31        /// caused by block_on's temporary runtime).
32        xsd_bytes: Vec<u8>,
33        backend: Arc<dyn XsdBridge>,
34        max_payload_bytes: Option<usize>,
35    },
36    Json {
37        validator: Arc<jsonschema::Validator>,
38        max_payload_bytes: Option<usize>,
39    },
40    Yaml {
41        validator: Arc<jsonschema::Validator>,
42        max_payload_bytes: Option<usize>,
43    },
44}
45
46impl std::fmt::Debug for CompiledValidator {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("CompiledValidator").finish_non_exhaustive()
49    }
50}
51
52impl CompiledValidator {
53    pub fn compile(
54        config: &ValidatorConfig,
55        xsd_backend: Arc<dyn XsdBridge>,
56    ) -> Result<Self, CamelError> {
57        let path = &config.schema_path;
58
59        let resolver = FilesystemResolver;
60        let path_str = path.to_str().ok_or_else(|| {
61            CamelError::EndpointCreationFailed("schema path contains non-UTF-8 characters".into())
62        })?;
63        let content = resolver.resolve(path_str)?;
64
65        match config.schema_type {
66            SchemaType::Xml => Ok(Self::compile_xsd(
67                &content,
68                xsd_backend,
69                config.max_payload_bytes,
70            )),
71            SchemaType::Json => Self::compile_json(&content, path, config.max_payload_bytes),
72            SchemaType::Yaml => Self::compile_yaml_schema(&content, path, config.max_payload_bytes),
73            SchemaType::RelaxNg | SchemaType::Schematron => Err(CamelError::Config(format!(
74                "schema type {:?} is not yet supported; supported: Xml, Json, Yaml",
75                config.schema_type
76            ))),
77        }
78    }
79
80    fn compile_xsd(
81        content: &[u8],
82        backend: Arc<dyn XsdBridge>,
83        max_payload_bytes: Option<usize>,
84    ) -> Self {
85        // Bridge startup and schema registration are deferred to the first validate()
86        // call, which runs in a proper async context. This avoids runtime issues
87        // caused by block_on's temporary runtime (channel I/O tasks would die with it).
88        CompiledValidator::Xml {
89            xsd_bytes: content.to_vec(),
90            backend,
91            max_payload_bytes,
92        }
93    }
94
95    fn compile_json(
96        content: &[u8],
97        path: &Path,
98        max_payload_bytes: Option<usize>,
99    ) -> Result<Self, CamelError> {
100        let schema_value: serde_json::Value = serde_json::from_slice(content).map_err(|e| {
101            CamelError::EndpointCreationFailed(format!(
102                "invalid JSON schema '{}': {e}",
103                path.display()
104            ))
105        })?;
106
107        let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
108            CamelError::EndpointCreationFailed(format!(
109                "failed to compile JSON schema '{}': {e}",
110                path.display()
111            ))
112        })?;
113
114        Ok(CompiledValidator::Json {
115            validator: Arc::new(validator),
116            max_payload_bytes,
117        })
118    }
119
120    fn compile_yaml_schema(
121        content: &[u8],
122        path: &Path,
123        max_payload_bytes: Option<usize>,
124    ) -> Result<Self, CamelError> {
125        let yaml_str = std::str::from_utf8(content).map_err(|e| {
126            CamelError::EndpointCreationFailed(format!(
127                "YAML schema '{}' is not valid UTF-8: {e}",
128                path.display()
129            ))
130        })?;
131
132        let yaml_value: YamlValue = serde_yml::from_str(yaml_str).map_err(|e| {
133            CamelError::EndpointCreationFailed(format!(
134                "invalid YAML schema '{}': {e}",
135                path.display()
136            ))
137        })?;
138
139        let schema_value: serde_json::Value = serde_json::to_value(&yaml_value).map_err(|e| {
140            CamelError::EndpointCreationFailed(format!(
141                "failed to convert YAML schema to JSON '{}': {e}",
142                path.display()
143            ))
144        })?;
145
146        let validator = jsonschema::validator_for(&schema_value).map_err(|e| {
147            CamelError::EndpointCreationFailed(format!(
148                "failed to compile YAML schema '{}': {e}",
149                path.display()
150            ))
151        })?;
152
153        Ok(CompiledValidator::Yaml {
154            validator: Arc::new(validator),
155            max_payload_bytes,
156        })
157    }
158
159    pub async fn validate(&self, body: &Body) -> Result<(), CamelError> {
160        if let Some(limit) = self.max_payload_bytes()
161            && let Some(actual) = approx_body_byte_len(body)
162            && actual > limit
163        {
164            return Err(ValidatorError::PayloadTooLarge { actual, limit }.to_processor_error());
165        }
166
167        match self {
168            CompiledValidator::Xml {
169                xsd_bytes, backend, ..
170            } => Self::validate_xml(xsd_bytes, backend, body).await,
171            CompiledValidator::Json { validator, .. } => Self::validate_json(validator, body),
172            CompiledValidator::Yaml { validator, .. } => Self::validate_yaml(validator, body),
173        }
174    }
175
176    fn max_payload_bytes(&self) -> Option<usize> {
177        match self {
178            CompiledValidator::Xml {
179                max_payload_bytes, ..
180            } => *max_payload_bytes,
181            CompiledValidator::Json {
182                max_payload_bytes, ..
183            } => *max_payload_bytes,
184            CompiledValidator::Yaml {
185                max_payload_bytes, ..
186            } => *max_payload_bytes,
187        }
188    }
189
190    async fn validate_xml(
191        xsd_bytes: &[u8],
192        backend: &Arc<dyn XsdBridge>,
193        body: &Body,
194    ) -> Result<(), CamelError> {
195        // Register lazily (idempotent: no-op if already registered).
196        let schema_id = backend
197            .register(xsd_bytes.to_vec())
198            .await
199            .map_err(|e| CamelError::Config(format!("XSD registration failed: {e}")))?;
200
201        let xml_bytes = match body {
202            Body::Xml(s) => s.as_bytes().to_vec(),
203            Body::Text(s) => s.as_bytes().to_vec(),
204            Body::Bytes(b) => b.to_vec(),
205            _ => {
206                return Err(CamelError::ProcessorError(
207                    "XSD validator requires Body::Xml, Body::Text, or Body::Bytes".to_string(),
208                ));
209            }
210        };
211
212        backend
213            .validate(&schema_id, xml_bytes)
214            .await
215            .map_err(|e| e.to_processor_error())
216    }
217
218    fn validate_json(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
219        let json_value = match body {
220            Body::Json(v) => v.clone(),
221            Body::Text(s) => serde_json::from_str(s)
222                .map_err(|e| CamelError::ProcessorError(format!("body is not valid JSON: {e}")))?,
223            Body::Bytes(b) => serde_json::from_slice(b).map_err(|e| {
224                CamelError::ProcessorError(format!("body bytes are not valid JSON: {e}"))
225            })?,
226            _ => {
227                return Err(CamelError::ProcessorError(
228                    "JSON Schema validator requires Body::Json, Body::Text, or Body::Bytes"
229                        .to_string(),
230                ));
231            }
232        };
233
234        let messages: Vec<String> = validator
235            .iter_errors(&json_value)
236            .map(|e| format!("{e} at {}", e.instance_path()))
237            .collect();
238
239        if messages.is_empty() {
240            Ok(())
241        } else {
242            Err(CamelError::ProcessorError(format!(
243                "JSON Schema validation failed:\n{}",
244                messages.join("\n")
245            )))
246        }
247    }
248
249    fn validate_yaml(validator: &jsonschema::Validator, body: &Body) -> Result<(), CamelError> {
250        let yaml_str = match body {
251            Body::Text(s) => s.as_str(),
252            _ => {
253                return Err(CamelError::ProcessorError(
254                    "YAML validator requires a text body (Body::Text)".to_string(),
255                ));
256            }
257        };
258
259        let yaml_value: serde_yml::Value = serde_yml::from_str(yaml_str)
260            .map_err(|e| CamelError::ProcessorError(format!("body is not valid YAML: {e}")))?;
261
262        let json_value: serde_json::Value = serde_json::to_value(&yaml_value)
263            .map_err(|e| CamelError::ProcessorError(format!("YAML→JSON conversion failed: {e}")))?;
264
265        let messages: Vec<String> = validator
266            .iter_errors(&json_value)
267            .map(|e| format!("{e} at {}", e.instance_path()))
268            .collect();
269
270        if messages.is_empty() {
271            Ok(())
272        } else {
273            Err(CamelError::ProcessorError(format!(
274                "YAML Schema validation failed:\n{}",
275                messages.join("\n")
276            )))
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use crate::config::{DEFAULT_SCHEMA_CACHE_MAX_ENTRIES, SchemaType, ValidatorConfig};
285    use async_trait::async_trait;
286
287    #[derive(Debug, Clone)]
288    struct MockBridge {
289        register_err: Option<ValidatorError>,
290    }
291
292    #[async_trait]
293    impl XsdBridge for MockBridge {
294        async fn register(&self, _xsd_bytes: Vec<u8>) -> Result<String, ValidatorError> {
295            if let Some(err) = &self.register_err {
296                return Err(err.clone());
297            }
298            Ok("xsd-mock".to_string())
299        }
300
301        async fn validate(
302            &self,
303            _schema_id: &str,
304            _doc_bytes: Vec<u8>,
305        ) -> Result<(), ValidatorError> {
306            Ok(())
307        }
308    }
309
310    #[tokio::test]
311    async fn xsd_bridge_register_error_propagates_on_validate() {
312        let mut schema = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
313        use std::io::Write;
314        schema.write_all(b"<xs:schema/>").unwrap();
315
316        let cfg = ValidatorConfig {
317            schema_path: schema.path().to_path_buf(),
318            schema_type: SchemaType::Xml,
319            max_payload_bytes: None,
320            schema_cache_max_entries: DEFAULT_SCHEMA_CACHE_MAX_ENTRIES,
321            fail_on_null_body: true,
322            header_name: None,
323            fail_on_null_header: true,
324        };
325
326        let bridge = Arc::new(MockBridge {
327            register_err: Some(ValidatorError::CompilationFailed {
328                message: "COMPILATION_FAILED".to_string(),
329                source: None,
330            }),
331        });
332
333        // compile() is now sync and always succeeds for XSD (deferred registration)
334        let compiled = CompiledValidator::compile(&cfg, bridge).expect("compile should succeed");
335
336        // The error surfaces on the first validate() call when register() is attempted
337        let err = compiled
338            .validate(&Body::Xml("<order/>".to_string()))
339            .await
340            .expect_err("expected validate to fail due to registration error");
341        assert!(matches!(err, CamelError::Config(_)));
342        assert!(err.to_string().contains("COMPILATION_FAILED"));
343    }
344
345    #[test]
346    fn approx_body_byte_len_variants() {
347        assert_eq!(approx_body_byte_len(&Body::Empty), Some(0));
348        assert_eq!(
349            approx_body_byte_len(&Body::Text("hello".to_string())),
350            Some(5)
351        );
352        assert_eq!(
353            approx_body_byte_len(&Body::Xml("<a/>".to_string())),
354            Some(4)
355        );
356        // Json is serialized to measure
357        let json_len = approx_body_byte_len(&Body::Json(serde_json::json!({"id": 1})));
358        assert!(json_len.is_some());
359        assert!(json_len.unwrap() > 0);
360    }
361}