Skip to main content

camel_component_validator/
xsd_bridge.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::{Duration, Instant};
6
7use async_trait::async_trait;
8use camel_bridge::channel::connect_channel;
9use camel_bridge::download::{default_cache_dir_for_spec, ensure_binary_for_spec};
10use camel_bridge::health::wait_for_health;
11use camel_bridge::process::{BridgeError, BridgeProcess, BridgeProcessConfig};
12use camel_bridge::reconnect::BridgeReconnectHandler;
13use camel_bridge::spec::XML_BRIDGE;
14use camel_component_api::RuntimeObservability;
15use dashmap::DashMap;
16use sha2::{Digest, Sha256};
17use std::sync::OnceLock;
18use tokio::sync::{Mutex, RwLock, watch};
19use tonic::Code;
20use tonic::transport::Channel;
21use tracing::error;
22
23use crate::error::ValidatorError;
24use crate::proto;
25use crate::proto::{
26    HealthCheckRequest, RegisterSchemaRequest, RegisterSchemaResponse, ValidateResponse,
27    ValidateWithRequest,
28};
29
30pub type SchemaId = String;
31
32#[derive(Debug, Clone)]
33pub enum BridgeState {
34    Starting,
35    Ready { channel: Channel },
36    Degraded(String),
37    Restarting { attempt: u32, next_at: Instant },
38    Stopped,
39}
40
41pub struct XmlBridgeSlot {
42    pub state_rx: watch::Receiver<BridgeState>,
43    pub(crate) state_tx: watch::Sender<BridgeState>,
44    pub process: Arc<Mutex<Option<BridgeProcess>>>,
45}
46
47impl std::fmt::Debug for XmlBridgeSlot {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("XmlBridgeSlot").finish()
50    }
51}
52
53type ConnectFuture = Pin<Box<dyn Future<Output = Result<Channel, BridgeError>> + Send>>;
54type ConnectFn = dyn Fn(u16) -> ConnectFuture + Send + Sync;
55
56#[async_trait]
57pub trait XsdBridge: Send + Sync {
58    async fn register(&self, xsd_bytes: Vec<u8>) -> Result<SchemaId, ValidatorError>;
59    async fn validate(&self, schema_id: &str, doc_bytes: Vec<u8>) -> Result<(), ValidatorError>;
60}
61
62#[async_trait]
63trait XsdBridgeRpc: Send + Sync {
64    async fn register_schema(
65        &self,
66        channel: Channel,
67        request: RegisterSchemaRequest,
68    ) -> Result<RegisterSchemaResponse, ValidatorError>;
69
70    async fn validate_with(
71        &self,
72        channel: Channel,
73        request: ValidateWithRequest,
74    ) -> Result<ValidateResponse, ValidatorError>;
75}
76
77#[derive(Debug)]
78struct GrpcXsdBridgeRpc;
79
80#[async_trait]
81impl XsdBridgeRpc for GrpcXsdBridgeRpc {
82    async fn register_schema(
83        &self,
84        channel: Channel,
85        request: RegisterSchemaRequest,
86    ) -> Result<RegisterSchemaResponse, ValidatorError> {
87        let mut client = proto::xsd_validator_client::XsdValidatorClient::new(channel);
88        let response = client.register_schema(request).await.map_err(|e| {
89            ValidatorError::transport_with_source("xml-bridge register_schema RPC failed", e)
90        })?;
91        Ok(response.into_inner())
92    }
93
94    async fn validate_with(
95        &self,
96        channel: Channel,
97        request: ValidateWithRequest,
98    ) -> Result<ValidateResponse, ValidatorError> {
99        let mut client = proto::xsd_validator_client::XsdValidatorClient::new(channel);
100        let response = client.validate_with(request).await.map_err(|e| {
101            ValidatorError::transport_with_source("xml-bridge validate_with RPC failed", e)
102        })?;
103        Ok(response.into_inner())
104    }
105}
106
107#[derive(Clone)]
108pub struct XsdBridgeBackend {
109    channel: Arc<RwLock<Option<Channel>>>,
110    schemas: Arc<DashMap<SchemaId, Vec<u8>>>,
111    schema_cache_max_entries: Arc<AtomicUsize>,
112    slot: Arc<XmlBridgeSlot>,
113    rpc: Arc<dyn XsdBridgeRpc>,
114    connect_fn: Arc<ConnectFn>,
115    start_lock: Arc<Mutex<()>>,
116    bridge_version: String,
117    bridge_cache_dir: std::path::PathBuf,
118    bridge_start_timeout_ms: u64,
119    observability: Arc<OnceLock<(Arc<dyn RuntimeObservability>, String)>>,
120}
121
122impl std::fmt::Debug for XsdBridgeBackend {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        f.debug_struct("XsdBridgeBackend")
125            .field("bridge_version", &self.bridge_version)
126            .field("bridge_cache_dir", &self.bridge_cache_dir)
127            .finish()
128    }
129}
130
131impl XsdBridgeBackend {
132    pub fn new() -> Self {
133        let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
134        let slot = Arc::new(XmlBridgeSlot {
135            state_rx,
136            state_tx,
137            process: Arc::new(Mutex::new(None)),
138        });
139
140        Self {
141            channel: Arc::new(RwLock::new(None)),
142            schemas: Arc::new(DashMap::new()),
143            schema_cache_max_entries: Arc::new(AtomicUsize::new(
144                crate::config::DEFAULT_SCHEMA_CACHE_MAX_ENTRIES,
145            )),
146            slot,
147            rpc: Arc::new(GrpcXsdBridgeRpc),
148            connect_fn: Arc::new(|port| Box::pin(connect_channel(port))),
149            start_lock: Arc::new(Mutex::new(())),
150            bridge_version: crate::BRIDGE_VERSION.to_string(),
151            bridge_cache_dir: default_cache_dir_for_spec(&XML_BRIDGE),
152            bridge_start_timeout_ms: 30_000,
153            observability: Arc::new(OnceLock::new()),
154        }
155    }
156
157    #[cfg(test)]
158    fn for_test(rpc: Arc<dyn XsdBridgeRpc>, connect_fn: Arc<ConnectFn>, channel: Channel) -> Self {
159        let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
160            channel: channel.clone(),
161        });
162        let slot = Arc::new(XmlBridgeSlot {
163            state_rx,
164            state_tx,
165            process: Arc::new(Mutex::new(None)),
166        });
167        Self {
168            channel: Arc::new(RwLock::new(Some(channel))),
169            schemas: Arc::new(DashMap::new()),
170            schema_cache_max_entries: Arc::new(AtomicUsize::new(
171                crate::config::DEFAULT_SCHEMA_CACHE_MAX_ENTRIES,
172            )),
173            slot,
174            rpc,
175            connect_fn,
176            start_lock: Arc::new(Mutex::new(())),
177            bridge_version: crate::BRIDGE_VERSION.to_string(),
178            bridge_cache_dir: default_cache_dir_for_spec(&XML_BRIDGE),
179            bridge_start_timeout_ms: 30_000,
180            observability: Arc::new(OnceLock::new()),
181        }
182    }
183
184    pub fn set_observability(&self, runtime: Arc<dyn RuntimeObservability>, route_id: String) {
185        self.observability.set((runtime, route_id)).ok();
186    }
187
188    pub fn schema_id_for(xsd_bytes: &[u8]) -> SchemaId {
189        let mut hasher = Sha256::new();
190        hasher.update(xsd_bytes);
191        format!("xsd-{}", hex::encode(hasher.finalize()))
192    }
193
194    /// Update the maximum number of entries allowed in the schema cache.
195    /// If the cache already exceeds the new limit, it is cleared immediately.
196    pub fn set_schema_cache_max_entries(&self, max_entries: usize) {
197        if self.schemas.len() > max_entries {
198            self.schemas.clear();
199        }
200        self.schema_cache_max_entries
201            .store(max_entries, Ordering::Relaxed);
202    }
203
204    async fn ensure_bridge_ready(&self) -> Result<Channel, ValidatorError> {
205        if let Some(ch) = self.channel.read().await.clone() {
206            return Ok(ch);
207        }
208
209        let _guard = self.start_lock.lock().await;
210        if let Some(ch) = self.channel.read().await.clone() {
211            return Ok(ch);
212        }
213
214        let _ = self.slot.state_tx.send(BridgeState::Starting);
215        let (process, channel, port) = self.start_bridge_process().await?;
216        {
217            let mut process_guard = self.slot.process.lock().await;
218            *process_guard = Some(process);
219        }
220        {
221            let mut ch_guard = self.channel.write().await;
222            *ch_guard = Some(channel.clone());
223        }
224        let _ = self.slot.state_tx.send(BridgeState::Ready {
225            channel: channel.clone(),
226        });
227        self.on_reconnect(port).map_err(|e| {
228            ValidatorError::transport_with_source("xml-bridge reconnect handler failed", e)
229        })?;
230
231        Ok(channel)
232    }
233
234    async fn restart_bridge(&self) -> Result<Channel, ValidatorError> {
235        let _guard = self.start_lock.lock().await;
236
237        let _ = self.slot.state_tx.send(BridgeState::Restarting {
238            attempt: 0,
239            next_at: Instant::now(),
240        });
241
242        let old_process = {
243            let mut process_guard = self.slot.process.lock().await;
244            process_guard.take()
245        };
246        if let Some(p) = old_process {
247            let _ = p.stop().await;
248        }
249
250        let (process, channel, port) = self.start_bridge_process().await?;
251        {
252            let mut process_guard = self.slot.process.lock().await;
253            *process_guard = Some(process);
254        }
255        {
256            let mut ch_guard = self.channel.write().await;
257            *ch_guard = Some(channel.clone());
258        }
259
260        self.on_reconnect(port).map_err(|e| {
261            ValidatorError::transport_with_source("xml-bridge reconnect handler failed", e)
262        })?;
263        let _ = self.slot.state_tx.send(BridgeState::Ready {
264            channel: channel.clone(),
265        });
266
267        Ok(channel)
268    }
269
270    async fn start_bridge_process(&self) -> Result<(BridgeProcess, Channel, u16), ValidatorError> {
271        let binary_path =
272            ensure_binary_for_spec(&XML_BRIDGE, &self.bridge_version, &self.bridge_cache_dir)
273                .await
274                .map_err(|e| {
275                    ValidatorError::endpoint(format!("XML bridge binary unavailable: {e}"))
276                })?;
277
278        let process_config = BridgeProcessConfig::xml(binary_path, self.bridge_start_timeout_ms);
279        let process = BridgeProcess::start(&process_config)
280            .await
281            .map_err(|e| ValidatorError::endpoint(format!("XML bridge start failed: {e}")))?;
282        let port = process.grpc_port();
283        let channel = (self.connect_fn)(port).await.map_err(|e| {
284            ValidatorError::endpoint(format!("XML bridge channel connect failed: {e}"))
285        })?;
286
287        wait_for_health(&channel, Duration::from_secs(10), |ch| {
288            let mut client = proto::health_client::HealthClient::new(ch);
289            async move {
290                let resp = client.check(HealthCheckRequest {}).await?;
291                Ok(resp.into_inner().status == "SERVING")
292            }
293        })
294        .await
295        .map_err(|e| ValidatorError::endpoint(format!("XML bridge health check failed: {e}")))?;
296
297        Ok((process, channel, port))
298    }
299
300    fn is_transport_error(msg: &str) -> bool {
301        msg.contains(&Code::Unavailable.to_string())
302            || msg.contains(&Code::Unknown.to_string())
303            || msg.contains("transport")
304    }
305
306    pub async fn shutdown(&self) {
307        let mut guard = self.slot.process.lock().await;
308        if let Some(p) = guard.take()
309            && let Err(e) = p.stop().await
310        {
311            tracing::warn!("Failed to stop XSD bridge process: {}", e);
312        }
313    }
314
315    async fn register_with_channel(
316        &self,
317        channel: Channel,
318        schema_id: SchemaId,
319        xsd_bytes: Vec<u8>,
320    ) -> Result<SchemaId, ValidatorError> {
321        let response = self
322            .rpc
323            .register_schema(
324                channel,
325                RegisterSchemaRequest {
326                    schema_id: schema_id.clone(),
327                    schema: xsd_bytes.clone(),
328                },
329            )
330            .await?;
331
332        if let Some(err) = response.error {
333            return Err(ValidatorError::from_bridge_error(&err));
334        }
335
336        // Evict the entire cache if it has reached capacity. A full clear is
337        // acceptable because schemas are re-registered on demand and also
338        // re-seeded after bridge reconnects.
339        let max_entries = self.schema_cache_max_entries.load(Ordering::Relaxed);
340        if self.schemas.len() >= max_entries && !self.schemas.contains_key(&schema_id) {
341            self.schemas.clear();
342        }
343
344        self.schemas.insert(schema_id.clone(), xsd_bytes);
345        Ok(schema_id)
346    }
347}
348
349impl BridgeReconnectHandler for XsdBridgeBackend {
350    fn on_reconnect(&self, _port: u16) -> Result<(), BridgeError> {
351        let this = self.clone();
352        tokio::spawn(async move {
353            let Some(channel) = this.channel.read().await.clone() else {
354                return;
355            };
356
357            let schemas: Vec<(SchemaId, Vec<u8>)> = this
358                .schemas
359                .iter()
360                .map(|entry| (entry.key().clone(), entry.value().clone()))
361                .collect();
362
363            let observability = this
364                .observability
365                .get()
366                .map(|(rt, rid)| (Arc::clone(rt), rid.clone()));
367
368            for (schema_id, schema_bytes) in schemas {
369                if let Err(e) = this
370                    .rpc
371                    .register_schema(
372                        channel.clone(),
373                        RegisterSchemaRequest {
374                            schema_id: schema_id.clone(),
375                            schema: schema_bytes,
376                        },
377                    )
378                    .await
379                {
380                    if let Some((ref rt, ref rid)) = observability {
381                        rt.metrics()
382                            .increment_errors(rid, "e:validator:reconnect-reseed");
383                    }
384                    // log-policy: outside-contract
385                    error!(schema_id = %schema_id, error = %e, "re-seed schema failed after reconnect");
386                }
387            }
388        });
389        Ok(())
390    }
391}
392
393#[async_trait]
394impl XsdBridge for XsdBridgeBackend {
395    async fn register(&self, xsd_bytes: Vec<u8>) -> Result<SchemaId, ValidatorError> {
396        let schema_id = Self::schema_id_for(&xsd_bytes);
397        if self.schemas.contains_key(&schema_id) {
398            return Ok(schema_id);
399        }
400
401        let channel = self.ensure_bridge_ready().await?;
402        match self
403            .register_with_channel(channel.clone(), schema_id.clone(), xsd_bytes.clone())
404            .await
405        {
406            Ok(id) => Ok(id),
407            Err(e) if Self::is_transport_error(&e.to_string()) => {
408                let restarted = self.restart_bridge().await?;
409                self.register_with_channel(restarted, schema_id, xsd_bytes)
410                    .await
411            }
412            Err(e) => Err(e),
413        }
414    }
415
416    async fn validate(&self, schema_id: &str, doc_bytes: Vec<u8>) -> Result<(), ValidatorError> {
417        let channel = self.ensure_bridge_ready().await?;
418        let req = ValidateWithRequest {
419            schema_id: schema_id.to_string(),
420            document: doc_bytes.clone(),
421        };
422
423        let response = match self.rpc.validate_with(channel.clone(), req).await {
424            Ok(resp) => resp,
425            Err(e) if Self::is_transport_error(&e.to_string()) => {
426                let restarted = self.restart_bridge().await?;
427                self.rpc
428                    .validate_with(
429                        restarted,
430                        ValidateWithRequest {
431                            schema_id: schema_id.to_string(),
432                            document: doc_bytes,
433                        },
434                    )
435                    .await?
436            }
437            Err(e) => return Err(e),
438        };
439
440        if let Some(err) = response.error {
441            return Err(ValidatorError::from_bridge_error(&err));
442        }
443        if response.valid {
444            return Ok(());
445        }
446
447        let details = response
448            .errors
449            .iter()
450            .map(|e| format!("{}:{} {}", e.line, e.column, e.message))
451            .collect::<Vec<_>>()
452            .join("\n");
453        Err(ValidatorError::validation(format!(
454            "XSD validation failed:\n{details}"
455        )))
456    }
457}
458
459impl Default for XsdBridgeBackend {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use std::sync::atomic::{AtomicUsize, Ordering};
469    use tonic::transport::Endpoint;
470
471    #[derive(Debug)]
472    struct MockRpc {
473        register_calls: Arc<AtomicUsize>,
474        validate_ok: bool,
475    }
476
477    #[async_trait]
478    impl XsdBridgeRpc for MockRpc {
479        async fn register_schema(
480            &self,
481            _channel: Channel,
482            request: RegisterSchemaRequest,
483        ) -> Result<RegisterSchemaResponse, ValidatorError> {
484            self.register_calls.fetch_add(1, Ordering::SeqCst);
485            Ok(RegisterSchemaResponse {
486                schema_id: request.schema_id,
487                error: None,
488            })
489        }
490
491        async fn validate_with(
492            &self,
493            _channel: Channel,
494            _request: ValidateWithRequest,
495        ) -> Result<ValidateResponse, ValidatorError> {
496            Ok(ValidateResponse {
497                valid: self.validate_ok,
498                errors: Vec::new(),
499                error: None,
500            })
501        }
502    }
503
504    fn lazy_channel() -> Channel {
505        Endpoint::from_static("http://127.0.0.1:65535").connect_lazy()
506    }
507
508    #[tokio::test]
509    async fn xsd_bridge_reconnect_reseeds() {
510        let calls = Arc::new(AtomicUsize::new(0));
511        let rpc = Arc::new(MockRpc {
512            register_calls: Arc::clone(&calls),
513            validate_ok: true,
514        });
515        let connector =
516            Arc::new(|_port| Box::pin(async move { Ok(lazy_channel()) }) as ConnectFuture);
517        let backend = XsdBridgeBackend::for_test(rpc, connector, lazy_channel());
518
519        let _id_a = backend.register(b"<xsd:a/>".to_vec()).await.unwrap();
520        let _id_b = backend.register(b"<xsd:b/>".to_vec()).await.unwrap();
521
522        backend.on_reconnect(50051).unwrap();
523        tokio::time::sleep(Duration::from_millis(20)).await;
524
525        assert!(calls.load(Ordering::SeqCst) >= 4);
526    }
527}