1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::{Duration, Instant};
6
7use async_trait::async_trait;
8use camel_bridge::channel::connect_channel;
9use camel_bridge::download::{default_cache_dir_for_spec, ensure_binary_for_spec};
10use camel_bridge::health::wait_for_health;
11use camel_bridge::process::{BridgeError, BridgeProcess, BridgeProcessConfig};
12use camel_bridge::reconnect::BridgeReconnectHandler;
13use camel_bridge::spec::XML_BRIDGE;
14use camel_component_api::RuntimeObservability;
15use dashmap::DashMap;
16use sha2::{Digest, Sha256};
17use std::sync::OnceLock;
18use tokio::sync::{Mutex, RwLock, watch};
19use tonic::Code;
20use tonic::transport::Channel;
21use tracing::error;
22
23use crate::error::ValidatorError;
24use crate::proto;
25use crate::proto::{
26 HealthCheckRequest, RegisterSchemaRequest, RegisterSchemaResponse, ValidateResponse,
27 ValidateWithRequest,
28};
29
30pub type SchemaId = String;
31
32#[derive(Debug, Clone)]
33pub enum BridgeState {
34 Starting,
35 Ready { channel: Channel },
36 Degraded(String),
37 Restarting { attempt: u32, next_at: Instant },
38 Stopped,
39}
40
41pub struct XmlBridgeSlot {
42 pub state_rx: watch::Receiver<BridgeState>,
43 pub(crate) state_tx: watch::Sender<BridgeState>,
44 pub process: Arc<Mutex<Option<BridgeProcess>>>,
45}
46
47impl std::fmt::Debug for XmlBridgeSlot {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("XmlBridgeSlot").finish()
50 }
51}
52
53type ConnectFuture = Pin<Box<dyn Future<Output = Result<Channel, BridgeError>> + Send>>;
54type ConnectFn = dyn Fn(u16) -> ConnectFuture + Send + Sync;
55
56#[async_trait]
57pub trait XsdBridge: Send + Sync {
58 async fn register(&self, xsd_bytes: Vec<u8>) -> Result<SchemaId, ValidatorError>;
59 async fn validate(&self, schema_id: &str, doc_bytes: Vec<u8>) -> Result<(), ValidatorError>;
60}
61
62#[async_trait]
63trait XsdBridgeRpc: Send + Sync {
64 async fn register_schema(
65 &self,
66 channel: Channel,
67 request: RegisterSchemaRequest,
68 ) -> Result<RegisterSchemaResponse, ValidatorError>;
69
70 async fn validate_with(
71 &self,
72 channel: Channel,
73 request: ValidateWithRequest,
74 ) -> Result<ValidateResponse, ValidatorError>;
75}
76
77#[derive(Debug)]
78struct GrpcXsdBridgeRpc;
79
80#[async_trait]
81impl XsdBridgeRpc for GrpcXsdBridgeRpc {
82 async fn register_schema(
83 &self,
84 channel: Channel,
85 request: RegisterSchemaRequest,
86 ) -> Result<RegisterSchemaResponse, ValidatorError> {
87 let mut client = proto::xsd_validator_client::XsdValidatorClient::new(channel);
88 let response = client.register_schema(request).await.map_err(|e| {
89 ValidatorError::transport_with_source("xml-bridge register_schema RPC failed", e)
90 })?;
91 Ok(response.into_inner())
92 }
93
94 async fn validate_with(
95 &self,
96 channel: Channel,
97 request: ValidateWithRequest,
98 ) -> Result<ValidateResponse, ValidatorError> {
99 let mut client = proto::xsd_validator_client::XsdValidatorClient::new(channel);
100 let response = client.validate_with(request).await.map_err(|e| {
101 ValidatorError::transport_with_source("xml-bridge validate_with RPC failed", e)
102 })?;
103 Ok(response.into_inner())
104 }
105}
106
107#[derive(Clone)]
108pub struct XsdBridgeBackend {
109 channel: Arc<RwLock<Option<Channel>>>,
110 schemas: Arc<DashMap<SchemaId, Vec<u8>>>,
111 schema_cache_max_entries: Arc<AtomicUsize>,
112 slot: Arc<XmlBridgeSlot>,
113 rpc: Arc<dyn XsdBridgeRpc>,
114 connect_fn: Arc<ConnectFn>,
115 start_lock: Arc<Mutex<()>>,
116 bridge_version: String,
117 bridge_cache_dir: std::path::PathBuf,
118 bridge_start_timeout_ms: u64,
119 observability: Arc<OnceLock<(Arc<dyn RuntimeObservability>, String)>>,
120}
121
122impl std::fmt::Debug for XsdBridgeBackend {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_struct("XsdBridgeBackend")
125 .field("bridge_version", &self.bridge_version)
126 .field("bridge_cache_dir", &self.bridge_cache_dir)
127 .finish()
128 }
129}
130
131impl XsdBridgeBackend {
132 pub fn new() -> Self {
133 let (state_tx, state_rx) = watch::channel(BridgeState::Stopped);
134 let slot = Arc::new(XmlBridgeSlot {
135 state_rx,
136 state_tx,
137 process: Arc::new(Mutex::new(None)),
138 });
139
140 Self {
141 channel: Arc::new(RwLock::new(None)),
142 schemas: Arc::new(DashMap::new()),
143 schema_cache_max_entries: Arc::new(AtomicUsize::new(
144 crate::config::DEFAULT_SCHEMA_CACHE_MAX_ENTRIES,
145 )),
146 slot,
147 rpc: Arc::new(GrpcXsdBridgeRpc),
148 connect_fn: Arc::new(|port| Box::pin(connect_channel(port))),
149 start_lock: Arc::new(Mutex::new(())),
150 bridge_version: crate::BRIDGE_VERSION.to_string(),
151 bridge_cache_dir: default_cache_dir_for_spec(&XML_BRIDGE),
152 bridge_start_timeout_ms: 30_000,
153 observability: Arc::new(OnceLock::new()),
154 }
155 }
156
157 #[cfg(test)]
158 fn for_test(rpc: Arc<dyn XsdBridgeRpc>, connect_fn: Arc<ConnectFn>, channel: Channel) -> Self {
159 let (state_tx, state_rx) = watch::channel(BridgeState::Ready {
160 channel: channel.clone(),
161 });
162 let slot = Arc::new(XmlBridgeSlot {
163 state_rx,
164 state_tx,
165 process: Arc::new(Mutex::new(None)),
166 });
167 Self {
168 channel: Arc::new(RwLock::new(Some(channel))),
169 schemas: Arc::new(DashMap::new()),
170 schema_cache_max_entries: Arc::new(AtomicUsize::new(
171 crate::config::DEFAULT_SCHEMA_CACHE_MAX_ENTRIES,
172 )),
173 slot,
174 rpc,
175 connect_fn,
176 start_lock: Arc::new(Mutex::new(())),
177 bridge_version: crate::BRIDGE_VERSION.to_string(),
178 bridge_cache_dir: default_cache_dir_for_spec(&XML_BRIDGE),
179 bridge_start_timeout_ms: 30_000,
180 observability: Arc::new(OnceLock::new()),
181 }
182 }
183
184 pub fn set_observability(&self, runtime: Arc<dyn RuntimeObservability>, route_id: String) {
185 self.observability.set((runtime, route_id)).ok();
186 }
187
188 pub fn schema_id_for(xsd_bytes: &[u8]) -> SchemaId {
189 let mut hasher = Sha256::new();
190 hasher.update(xsd_bytes);
191 format!("xsd-{}", hex::encode(hasher.finalize()))
192 }
193
194 pub fn set_schema_cache_max_entries(&self, max_entries: usize) {
197 if self.schemas.len() > max_entries {
198 self.schemas.clear();
199 }
200 self.schema_cache_max_entries
201 .store(max_entries, Ordering::Relaxed);
202 }
203
204 async fn ensure_bridge_ready(&self) -> Result<Channel, ValidatorError> {
205 if let Some(ch) = self.channel.read().await.clone() {
206 return Ok(ch);
207 }
208
209 let _guard = self.start_lock.lock().await;
210 if let Some(ch) = self.channel.read().await.clone() {
211 return Ok(ch);
212 }
213
214 let _ = self.slot.state_tx.send(BridgeState::Starting);
215 let (process, channel, port) = self.start_bridge_process().await?;
216 {
217 let mut process_guard = self.slot.process.lock().await;
218 *process_guard = Some(process);
219 }
220 {
221 let mut ch_guard = self.channel.write().await;
222 *ch_guard = Some(channel.clone());
223 }
224 let _ = self.slot.state_tx.send(BridgeState::Ready {
225 channel: channel.clone(),
226 });
227 self.on_reconnect(port).map_err(|e| {
228 ValidatorError::transport_with_source("xml-bridge reconnect handler failed", e)
229 })?;
230
231 Ok(channel)
232 }
233
234 async fn restart_bridge(&self) -> Result<Channel, ValidatorError> {
235 let _guard = self.start_lock.lock().await;
236
237 let _ = self.slot.state_tx.send(BridgeState::Restarting {
238 attempt: 0,
239 next_at: Instant::now(),
240 });
241
242 let old_process = {
243 let mut process_guard = self.slot.process.lock().await;
244 process_guard.take()
245 };
246 if let Some(p) = old_process {
247 let _ = p.stop().await;
248 }
249
250 let (process, channel, port) = self.start_bridge_process().await?;
251 {
252 let mut process_guard = self.slot.process.lock().await;
253 *process_guard = Some(process);
254 }
255 {
256 let mut ch_guard = self.channel.write().await;
257 *ch_guard = Some(channel.clone());
258 }
259
260 self.on_reconnect(port).map_err(|e| {
261 ValidatorError::transport_with_source("xml-bridge reconnect handler failed", e)
262 })?;
263 let _ = self.slot.state_tx.send(BridgeState::Ready {
264 channel: channel.clone(),
265 });
266
267 Ok(channel)
268 }
269
270 async fn start_bridge_process(&self) -> Result<(BridgeProcess, Channel, u16), ValidatorError> {
271 let binary_path =
272 ensure_binary_for_spec(&XML_BRIDGE, &self.bridge_version, &self.bridge_cache_dir)
273 .await
274 .map_err(|e| {
275 ValidatorError::endpoint(format!("XML bridge binary unavailable: {e}"))
276 })?;
277
278 let process_config = BridgeProcessConfig::xml(binary_path, self.bridge_start_timeout_ms);
279 let process = BridgeProcess::start(&process_config)
280 .await
281 .map_err(|e| ValidatorError::endpoint(format!("XML bridge start failed: {e}")))?;
282 let port = process.grpc_port();
283 let channel = (self.connect_fn)(port).await.map_err(|e| {
284 ValidatorError::endpoint(format!("XML bridge channel connect failed: {e}"))
285 })?;
286
287 wait_for_health(&channel, Duration::from_secs(10), |ch| {
288 let mut client = proto::health_client::HealthClient::new(ch);
289 async move {
290 let resp = client.check(HealthCheckRequest {}).await?;
291 Ok(resp.into_inner().status == "SERVING")
292 }
293 })
294 .await
295 .map_err(|e| ValidatorError::endpoint(format!("XML bridge health check failed: {e}")))?;
296
297 Ok((process, channel, port))
298 }
299
300 fn is_transport_error(msg: &str) -> bool {
301 msg.contains(&Code::Unavailable.to_string())
302 || msg.contains(&Code::Unknown.to_string())
303 || msg.contains("transport")
304 }
305
306 pub async fn shutdown(&self) {
307 let mut guard = self.slot.process.lock().await;
308 if let Some(p) = guard.take()
309 && let Err(e) = p.stop().await
310 {
311 tracing::warn!("Failed to stop XSD bridge process: {}", e);
312 }
313 }
314
315 async fn register_with_channel(
316 &self,
317 channel: Channel,
318 schema_id: SchemaId,
319 xsd_bytes: Vec<u8>,
320 ) -> Result<SchemaId, ValidatorError> {
321 let response = self
322 .rpc
323 .register_schema(
324 channel,
325 RegisterSchemaRequest {
326 schema_id: schema_id.clone(),
327 schema: xsd_bytes.clone(),
328 },
329 )
330 .await?;
331
332 if let Some(err) = response.error {
333 return Err(ValidatorError::from_bridge_error(&err));
334 }
335
336 let max_entries = self.schema_cache_max_entries.load(Ordering::Relaxed);
340 if self.schemas.len() >= max_entries && !self.schemas.contains_key(&schema_id) {
341 self.schemas.clear();
342 }
343
344 self.schemas.insert(schema_id.clone(), xsd_bytes);
345 Ok(schema_id)
346 }
347}
348
349impl BridgeReconnectHandler for XsdBridgeBackend {
350 fn on_reconnect(&self, _port: u16) -> Result<(), BridgeError> {
351 let this = self.clone();
352 tokio::spawn(async move {
353 let Some(channel) = this.channel.read().await.clone() else {
354 return;
355 };
356
357 let schemas: Vec<(SchemaId, Vec<u8>)> = this
358 .schemas
359 .iter()
360 .map(|entry| (entry.key().clone(), entry.value().clone()))
361 .collect();
362
363 let observability = this
364 .observability
365 .get()
366 .map(|(rt, rid)| (Arc::clone(rt), rid.clone()));
367
368 for (schema_id, schema_bytes) in schemas {
369 if let Err(e) = this
370 .rpc
371 .register_schema(
372 channel.clone(),
373 RegisterSchemaRequest {
374 schema_id: schema_id.clone(),
375 schema: schema_bytes,
376 },
377 )
378 .await
379 {
380 if let Some((ref rt, ref rid)) = observability {
381 rt.metrics()
382 .increment_errors(rid, "e:validator:reconnect-reseed");
383 }
384 error!(schema_id = %schema_id, error = %e, "re-seed schema failed after reconnect");
386 }
387 }
388 });
389 Ok(())
390 }
391}
392
393#[async_trait]
394impl XsdBridge for XsdBridgeBackend {
395 async fn register(&self, xsd_bytes: Vec<u8>) -> Result<SchemaId, ValidatorError> {
396 let schema_id = Self::schema_id_for(&xsd_bytes);
397 if self.schemas.contains_key(&schema_id) {
398 return Ok(schema_id);
399 }
400
401 let channel = self.ensure_bridge_ready().await?;
402 match self
403 .register_with_channel(channel.clone(), schema_id.clone(), xsd_bytes.clone())
404 .await
405 {
406 Ok(id) => Ok(id),
407 Err(e) if Self::is_transport_error(&e.to_string()) => {
408 let restarted = self.restart_bridge().await?;
409 self.register_with_channel(restarted, schema_id, xsd_bytes)
410 .await
411 }
412 Err(e) => Err(e),
413 }
414 }
415
416 async fn validate(&self, schema_id: &str, doc_bytes: Vec<u8>) -> Result<(), ValidatorError> {
417 let channel = self.ensure_bridge_ready().await?;
418 let req = ValidateWithRequest {
419 schema_id: schema_id.to_string(),
420 document: doc_bytes.clone(),
421 };
422
423 let response = match self.rpc.validate_with(channel.clone(), req).await {
424 Ok(resp) => resp,
425 Err(e) if Self::is_transport_error(&e.to_string()) => {
426 let restarted = self.restart_bridge().await?;
427 self.rpc
428 .validate_with(
429 restarted,
430 ValidateWithRequest {
431 schema_id: schema_id.to_string(),
432 document: doc_bytes,
433 },
434 )
435 .await?
436 }
437 Err(e) => return Err(e),
438 };
439
440 if let Some(err) = response.error {
441 return Err(ValidatorError::from_bridge_error(&err));
442 }
443 if response.valid {
444 return Ok(());
445 }
446
447 let details = response
448 .errors
449 .iter()
450 .map(|e| format!("{}:{} {}", e.line, e.column, e.message))
451 .collect::<Vec<_>>()
452 .join("\n");
453 Err(ValidatorError::validation(format!(
454 "XSD validation failed:\n{details}"
455 )))
456 }
457}
458
459impl Default for XsdBridgeBackend {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use std::sync::atomic::{AtomicUsize, Ordering};
469 use tonic::transport::Endpoint;
470
471 #[derive(Debug)]
472 struct MockRpc {
473 register_calls: Arc<AtomicUsize>,
474 validate_ok: bool,
475 }
476
477 #[async_trait]
478 impl XsdBridgeRpc for MockRpc {
479 async fn register_schema(
480 &self,
481 _channel: Channel,
482 request: RegisterSchemaRequest,
483 ) -> Result<RegisterSchemaResponse, ValidatorError> {
484 self.register_calls.fetch_add(1, Ordering::SeqCst);
485 Ok(RegisterSchemaResponse {
486 schema_id: request.schema_id,
487 error: None,
488 })
489 }
490
491 async fn validate_with(
492 &self,
493 _channel: Channel,
494 _request: ValidateWithRequest,
495 ) -> Result<ValidateResponse, ValidatorError> {
496 Ok(ValidateResponse {
497 valid: self.validate_ok,
498 errors: Vec::new(),
499 error: None,
500 })
501 }
502 }
503
504 fn lazy_channel() -> Channel {
505 Endpoint::from_static("http://127.0.0.1:65535").connect_lazy()
506 }
507
508 #[tokio::test]
509 async fn xsd_bridge_reconnect_reseeds() {
510 let calls = Arc::new(AtomicUsize::new(0));
511 let rpc = Arc::new(MockRpc {
512 register_calls: Arc::clone(&calls),
513 validate_ok: true,
514 });
515 let connector =
516 Arc::new(|_port| Box::pin(async move { Ok(lazy_channel()) }) as ConnectFuture);
517 let backend = XsdBridgeBackend::for_test(rpc, connector, lazy_channel());
518
519 let _id_a = backend.register(b"<xsd:a/>".to_vec()).await.unwrap();
520 let _id_b = backend.register(b"<xsd:b/>".to_vec()).await.unwrap();
521
522 backend.on_reconnect(50051).unwrap();
523 tokio::time::sleep(Duration::from_millis(20)).await;
524
525 assert!(calls.load(Ordering::SeqCst) >= 4);
526 }
527}