camel_component_validator/
component.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7use tracing::debug;
8
9use crate::compiled::CompiledValidator;
10use crate::config::ValidatorConfig;
11use camel_component_api::ComponentContext;
12use camel_component_api::{
13 BoxProcessor, CamelError, Component, Consumer, Endpoint, Exchange, ProducerContext,
14};
15
16pub struct ValidatorComponent;
17
18impl ValidatorComponent {
19 pub fn new() -> Self {
20 Self
21 }
22}
23
24impl Default for ValidatorComponent {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl Component for ValidatorComponent {
31 fn scheme(&self) -> &str {
32 "validator"
33 }
34
35 fn create_endpoint(
36 &self,
37 uri: &str,
38 _ctx: &dyn ComponentContext,
39 ) -> Result<Box<dyn Endpoint>, CamelError> {
40 let config = ValidatorConfig::from_uri(uri)?;
41 let compiled = CompiledValidator::compile(&config)?;
42 Ok(Box::new(ValidatorEndpoint {
43 uri: uri.to_string(),
44 compiled: Arc::new(compiled),
45 }))
46 }
47}
48
49struct ValidatorEndpoint {
50 uri: String,
51 compiled: Arc<CompiledValidator>,
52}
53
54impl Endpoint for ValidatorEndpoint {
55 fn uri(&self) -> &str {
56 &self.uri
57 }
58
59 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
60 Err(CamelError::EndpointCreationFailed(
61 "validator endpoint does not support consumers".to_string(),
62 ))
63 }
64
65 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
66 Ok(BoxProcessor::new(ValidatorProducer {
67 uri: self.uri.clone(),
68 compiled: Arc::clone(&self.compiled),
69 }))
70 }
71}
72
73#[derive(Clone)]
74struct ValidatorProducer {
75 uri: String,
76 compiled: Arc<CompiledValidator>,
77}
78
79impl Service<Exchange> for ValidatorProducer {
80 type Response = Exchange;
81 type Error = CamelError;
82 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
83
84 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 Poll::Ready(Ok(()))
86 }
87
88 fn call(&mut self, exchange: Exchange) -> Self::Future {
89 let compiled = Arc::clone(&self.compiled);
90 let uri = self.uri.clone();
91 Box::pin(async move {
92 debug!(uri = uri, "validating exchange body");
93 compiled.validate(&exchange.input.body)?;
94 Ok(exchange)
95 })
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use camel_component_api::{Message, NoOpComponentContext};
103 use std::io::Write;
104 use tower::ServiceExt;
105
106 fn json_schema_file() -> tempfile::NamedTempFile {
107 let mut f = tempfile::Builder::new().suffix(".json").tempfile().unwrap();
108 f.write_all(br#"{"type":"object","required":["id"]}"#)
109 .unwrap();
110 f
111 }
112
113 fn xsd_file() -> tempfile::NamedTempFile {
114 let mut f = tempfile::Builder::new().suffix(".xsd").tempfile().unwrap();
115 f.write_all(
116 br#"<?xml version="1.0"?><xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"><xs:element name="order" type="xs:string"/></xs:schema>"#,
117 ).unwrap();
118 f
119 }
120
121 #[test]
122 fn scheme_is_validator() {
123 assert_eq!(ValidatorComponent::new().scheme(), "validator");
124 }
125
126 #[test]
127 fn consumer_not_supported() {
128 let f = json_schema_file();
129 let uri = format!("validator:{}", f.path().display());
130 let ep = ValidatorComponent::new()
131 .create_endpoint(&uri, &NoOpComponentContext)
132 .unwrap();
133 assert!(ep.create_consumer().is_err());
134 }
135
136 #[test]
137 fn endpoint_creation_fails_for_missing_schema() {
138 let result = ValidatorComponent::new()
139 .create_endpoint("validator:/nonexistent/schema.json", &NoOpComponentContext);
140 assert!(result.is_err());
141 }
142
143 #[tokio::test]
144 async fn valid_json_body_passes_through() {
145 let f = json_schema_file();
146 let uri = format!("validator:{}", f.path().display());
147 let ep = ValidatorComponent::new()
148 .create_endpoint(&uri, &NoOpComponentContext)
149 .unwrap();
150 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
151 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
152 serde_json::json!({"id": "1"}),
153 )));
154 let result = producer.oneshot(exchange).await;
155 assert!(result.is_ok());
156 }
157
158 #[tokio::test]
159 async fn invalid_json_body_returns_err() {
160 let f = json_schema_file();
161 let uri = format!("validator:{}", f.path().display());
162 let ep = ValidatorComponent::new()
163 .create_endpoint(&uri, &NoOpComponentContext)
164 .unwrap();
165 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
166 let exchange = Exchange::new(Message::new(camel_component_api::Body::Json(
167 serde_json::json!({"name": "x"}),
168 )));
169 let result = producer.oneshot(exchange).await;
170 assert!(result.is_err());
171 let msg = result.unwrap_err().to_string();
172 assert!(msg.contains("validation failed"), "got: {msg}");
173 }
174
175 #[tokio::test]
176 async fn valid_xml_body_passes() {
177 let f = xsd_file();
178 let uri = format!("validator:{}", f.path().display());
179 let ep = ValidatorComponent::new()
180 .create_endpoint(&uri, &NoOpComponentContext)
181 .unwrap();
182 let producer = ep.create_producer(&ProducerContext::new()).unwrap();
183 let exchange = Exchange::new(Message::new(camel_component_api::Body::Xml(
184 "<order>hello</order>".to_string(),
185 )));
186 assert!(producer.oneshot(exchange).await.is_ok());
187 }
188}