1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8 #[error("IO error: {0}")]
9 Io(#[from] std::io::Error),
10 #[error("Bridge timed out: {0}")]
11 Timeout(String),
12 #[error("Bridge stdout closed before ready message")]
13 StdoutClosed,
14 #[error("Bridge ready message malformed: {0}")]
15 BadReadyMessage(String),
16 #[error("Download failed: {0}")]
17 Download(String),
18 #[error("Checksum mismatch: expected {expected}, got {actual}")]
19 ChecksumMismatch { expected: String, actual: String },
20 #[error("URL not allowed: {0}")]
21 UrlNotAllowed(String),
22 #[error("Transport error: {0}")]
23 Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29 #[serde(alias = "active_mq")]
30 ActiveMq,
31 Artemis,
32 Generic,
33}
34
35impl BrokerType {
36 pub fn as_env_str(&self) -> &'static str {
37 match self {
38 BrokerType::ActiveMq => "activemq",
39 BrokerType::Artemis => "artemis",
40 BrokerType::Generic => "generic",
41 }
42 }
43}
44
45impl std::str::FromStr for BrokerType {
46 type Err = std::convert::Infallible;
47
48 fn from_str(s: &str) -> Result<Self, Self::Err> {
49 Ok(match s.to_lowercase().as_str() {
50 "activemq" => BrokerType::ActiveMq,
51 "artemis" => BrokerType::Artemis,
52 _ => BrokerType::Generic,
53 })
54 }
55}
56
57pub struct CxfProfileEnvVars {
59 pub name: String,
60 pub wsdl_path: String,
61 pub service_name: String,
62 pub port_name: String,
63 pub address: Option<String>,
64 pub keystore_path: Option<String>,
65 pub keystore_password: Option<String>,
66 pub truststore_path: Option<String>,
67 pub truststore_password: Option<String>,
68 pub sig_username: Option<String>,
69 pub sig_password: Option<String>,
70 pub enc_username: Option<String>,
71 pub security_actions_out: Option<String>,
72 pub security_actions_in: Option<String>,
73 pub signature_algorithm: Option<String>,
74 pub signature_digest_algorithm: Option<String>,
75 pub signature_c14n_algorithm: Option<String>,
76 pub signature_parts: Option<String>,
77}
78
79impl CxfProfileEnvVars {
80 pub fn to_env_vars(&self) -> Vec<(String, String)> {
81 let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
82 let mut vars = vec![
83 (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
84 (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
85 (format!("{}PORT_NAME", prefix), self.port_name.clone()),
86 ];
87
88 if let Some(ref v) = self.address {
89 vars.push((format!("{}ADDRESS", prefix), v.clone()));
90 }
91 if let Some(ref v) = self.keystore_path {
92 vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
93 }
94 if let Some(ref v) = self.keystore_password {
95 vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
96 }
97 if let Some(ref v) = self.truststore_path {
98 vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
99 }
100 if let Some(ref v) = self.truststore_password {
101 vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
102 }
103 if let Some(ref v) = self.sig_username {
104 vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
105 }
106 if let Some(ref v) = self.sig_password {
107 vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
108 }
109 if let Some(ref v) = self.enc_username {
110 vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
111 }
112 if let Some(ref v) = self.security_actions_out {
113 vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
114 }
115 if let Some(ref v) = self.security_actions_in {
116 vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
117 }
118 if let Some(ref v) = self.signature_algorithm {
119 vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
120 }
121 if let Some(ref v) = self.signature_digest_algorithm {
122 vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
123 }
124 if let Some(ref v) = self.signature_c14n_algorithm {
125 vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
126 }
127 if let Some(ref v) = self.signature_parts {
128 vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
129 }
130
131 vars
132 }
133}
134
135pub struct BridgeProcessConfig {
136 pub spec: &'static BridgeSpec,
137 pub binary_path: PathBuf,
138 pub broker_url: String,
139 pub broker_type: BrokerType,
140 pub username: Option<String>,
141 pub password: Option<String>,
142 pub start_timeout_ms: u64,
143 pub env_vars: Vec<(String, String)>,
144}
145
146impl BridgeProcessConfig {
147 pub fn jms(
149 binary_path: PathBuf,
150 broker_url: String,
151 broker_type: BrokerType,
152 username: Option<String>,
153 password: Option<String>,
154 start_timeout_ms: u64,
155 ) -> Self {
156 let mut env_vars = vec![
157 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
158 (
159 "BRIDGE_BROKER_TYPE".to_string(),
160 broker_type.as_env_str().to_string(),
161 ),
162 ];
163 if let Some(u) = &username {
164 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
165 }
166 if let Some(p) = &password {
167 env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
168 }
169 Self {
170 spec: &JMS_BRIDGE,
171 binary_path,
172 broker_url,
173 broker_type,
174 username,
175 password,
176 start_timeout_ms,
177 env_vars,
178 }
179 }
180
181 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
183 Self {
184 spec: &XML_BRIDGE,
185 binary_path,
186 broker_url: String::new(),
187 broker_type: BrokerType::Generic,
188 username: None,
189 password: None,
190 start_timeout_ms,
191 env_vars: vec![],
192 }
193 }
194
195 pub fn cxf_profiles(
198 binary_path: PathBuf,
199 profiles: &[CxfProfileEnvVars],
200 start_timeout_ms: u64,
201 ) -> Self {
202 let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
203 let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
204
205 for profile in profiles {
206 env_vars.extend(profile.to_env_vars());
207 }
208
209 Self {
210 spec: &CXF_BRIDGE,
211 binary_path,
212 broker_url: String::new(),
213 broker_type: BrokerType::Generic,
214 username: None,
215 password: None,
216 start_timeout_ms,
217 env_vars,
218 }
219 }
220}
221
222pub struct BridgeProcess {
223 child: tokio::process::Child,
224 grpc_port: u16,
225}
226
227impl BridgeProcess {
228 pub fn grpc_port(&self) -> u16 {
229 self.grpc_port
230 }
231
232 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
238 use tokio::io::AsyncBufReadExt;
239 use tokio::process::Command;
240 use tokio::time::{Duration, timeout};
241
242 let free_port = {
244 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
245 listener.local_addr()?.port()
246 };
247
248 let stderr_stdio: std::process::Stdio =
250 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
251 let log_filename = config
252 .spec
253 .log_file_template
254 .replace("{pid}", &std::process::id().to_string());
255 let log_path = if log_dir.is_empty() {
256 format!("/tmp/{log_filename}")
257 } else {
258 format!("{log_dir}/{log_filename}")
259 };
260 match std::fs::File::create(&log_path) {
261 Ok(f) => {
262 eprintln!("[camel-bridge] stderr → {}", log_path);
263 f.into()
264 }
265 Err(e) => {
266 eprintln!(
267 "[camel-bridge] failed to create log file {}: {}",
268 log_path, e
269 );
270 std::process::Stdio::inherit()
271 }
272 }
273 } else {
274 std::process::Stdio::inherit()
275 };
276
277 let mut command = Command::new(&config.binary_path);
278 command
279 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
280 .env("QUARKUS_HTTP_PORT", "0")
283 .stdout(std::process::Stdio::piped())
284 .stderr(stderr_stdio);
285
286 for (key, value) in &config.env_vars {
288 command.env(key, value);
289 }
290
291 let mut child = command.spawn()?;
292
293 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
294 let mut reader = tokio::io::BufReader::new(stdout).lines();
295
296 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
297 while let Some(line) = reader.next_line().await? {
298 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
299 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
300 {
301 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
302 return Ok(p as u16);
303 }
304 return Err(BridgeError::BadReadyMessage(line));
305 }
306 }
307 Err(BridgeError::StdoutClosed)
308 })
309 .await
310 .map_err(|_| {
311 BridgeError::Timeout(format!(
312 "{} failed to start: health check timeout after {}ms",
313 config.spec.name, config.start_timeout_ms
314 ))
315 })??;
316
317 tokio::spawn(async move {
321 while let Ok(Some(line)) = reader.next_line().await {
322 tracing::debug!(target: "camel_bridge::child", "{}", line);
323 }
324 });
325
326 Ok(BridgeProcess {
327 child,
328 grpc_port: port,
329 })
330 }
331
332 pub async fn stop(mut self) -> Result<(), BridgeError> {
334 use tokio::time::{Duration, sleep};
335
336 #[cfg(unix)]
338 {
339 let pid = self.child.id().unwrap_or(0);
340 if pid > 0 {
341 unsafe {
343 libc::kill(pid as i32, libc::SIGTERM);
344 }
345 }
346 }
347
348 #[cfg(not(unix))]
350 let _ = self.child.start_kill();
351
352 tokio::select! {
354 result = self.child.wait() => {
355 result?;
356 }
357 _ = sleep(Duration::from_secs(5)) => {
358 let _ = self.child.start_kill();
359 self.child.wait().await?;
360 }
361 }
362 Ok(())
363 }
364}
365
366impl Drop for BridgeProcess {
367 fn drop(&mut self) {
368 let _ = self.child.start_kill();
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn broker_type_from_str_activemq() {
379 assert_eq!(
380 "activemq".parse::<BrokerType>().unwrap(),
381 BrokerType::ActiveMq
382 );
383 assert_eq!(
384 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
385 BrokerType::ActiveMq
386 );
387 }
388
389 #[test]
390 fn broker_type_from_str_artemis() {
391 assert_eq!(
392 "artemis".parse::<BrokerType>().unwrap(),
393 BrokerType::Artemis
394 );
395 }
396
397 #[test]
398 fn broker_type_from_str_unknown_is_generic() {
399 assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
400 }
401
402 #[test]
403 fn broker_type_env_str() {
404 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
405 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
406 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
407 }
408
409 #[test]
410 fn jms_constructor_uses_jms_spec() {
411 let cfg = BridgeProcessConfig::jms(
412 PathBuf::from("/tmp/jms-bridge"),
413 "tcp://localhost:61616".to_string(),
414 BrokerType::ActiveMq,
415 Some("user".to_string()),
416 Some("pass".to_string()),
417 1000,
418 );
419 assert_eq!(cfg.spec.name, "jms-bridge");
420 }
421
422 #[test]
423 fn xml_constructor_uses_xml_spec() {
424 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
425 assert_eq!(cfg.spec.name, "xml-bridge");
426 }
427
428 #[test]
429 fn cxf_profiles_generates_cxf_profiles_env_var() {
430 let profiles = vec![
431 CxfProfileEnvVars {
432 name: "baleares".to_string(),
433 wsdl_path: "/a.wsdl".to_string(),
434 service_name: "Svc".to_string(),
435 port_name: "Port".to_string(),
436 address: None,
437 keystore_path: None,
438 keystore_password: None,
439 truststore_path: None,
440 truststore_password: None,
441 sig_username: None,
442 sig_password: None,
443 enc_username: None,
444 security_actions_out: None,
445 security_actions_in: None,
446 signature_algorithm: None,
447 signature_digest_algorithm: None,
448 signature_c14n_algorithm: None,
449 signature_parts: None,
450 },
451 CxfProfileEnvVars {
452 name: "extremadura".to_string(),
453 wsdl_path: "/b.wsdl".to_string(),
454 service_name: "Svc2".to_string(),
455 port_name: "Port2".to_string(),
456 address: Some("http://host:9090/ws".to_string()),
457 keystore_path: Some("/b.jks".to_string()),
458 keystore_password: Some("pass".to_string()),
459 truststore_path: None,
460 truststore_password: None,
461 sig_username: Some("cert".to_string()),
462 sig_password: Some("sig_pass".to_string()),
463 enc_username: None,
464 security_actions_out: Some("Timestamp Signature".to_string()),
465 security_actions_in: Some("Timestamp Signature".to_string()),
466 signature_algorithm: None,
467 signature_digest_algorithm: None,
468 signature_c14n_algorithm: None,
469 signature_parts: None,
470 },
471 ];
472
473 let cfg =
474 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
475
476 assert_eq!(cfg.spec.name, "cxf-bridge");
477 assert!(cfg.broker_url.is_empty());
478 assert_eq!(cfg.broker_type, BrokerType::Generic);
479 assert!(cfg.username.is_none());
480 assert!(cfg.password.is_none());
481
482 let profiles_var = cfg
484 .env_vars
485 .iter()
486 .find(|(k, _)| k == "CXF_PROFILES")
487 .expect("CXF_PROFILES env var must exist");
488 assert_eq!(profiles_var.1, "baleares,extremadura");
489
490 assert!(
492 cfg.env_vars
493 .iter()
494 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
495 );
496 assert!(
497 cfg.env_vars
498 .iter()
499 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
500 );
501 assert!(
502 cfg.env_vars
503 .iter()
504 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
505 );
506 assert!(
507 !cfg.env_vars
508 .iter()
509 .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
510 );
511
512 assert!(
514 cfg.env_vars
515 .iter()
516 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
517 );
518 assert!(
519 cfg.env_vars
520 .iter()
521 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
522 );
523 assert!(
524 cfg.env_vars
525 .iter()
526 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
527 );
528 assert!(
529 cfg.env_vars
530 .iter()
531 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
532 );
533 assert!(
534 cfg.env_vars
535 .iter()
536 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
537 );
538 assert!(
539 cfg.env_vars
540 .iter()
541 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
542 );
543 assert!(
544 cfg.env_vars
545 .iter()
546 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
547 && v == "Timestamp Signature")
548 );
549 }
550
551 #[test]
552 fn cxf_profiles_single_profile_no_security() {
553 let profiles = vec![CxfProfileEnvVars {
554 name: "test".to_string(),
555 wsdl_path: "service.wsdl".to_string(),
556 service_name: "{http://example.com}Service".to_string(),
557 port_name: "{http://example.com}Port".to_string(),
558 address: None,
559 keystore_path: None,
560 keystore_password: None,
561 truststore_path: None,
562 truststore_password: None,
563 sig_username: None,
564 sig_password: None,
565 enc_username: None,
566 security_actions_out: None,
567 security_actions_in: None,
568 signature_algorithm: None,
569 signature_digest_algorithm: None,
570 signature_c14n_algorithm: None,
571 signature_parts: None,
572 }];
573
574 let cfg =
575 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
576
577 assert_eq!(cfg.spec.name, "cxf-bridge");
578 assert_eq!(cfg.env_vars.len(), 4);
580 assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
581 assert_eq!(cfg.env_vars[0].1, "test");
582 assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
583 assert_eq!(cfg.env_vars[1].1, "service.wsdl");
584 assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
585 assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
586 assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
587 assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
588 }
589
590 #[test]
591 fn profile_env_vars_to_env_vars_includes_all_fields() {
592 let vars = CxfProfileEnvVars {
593 name: "full".to_string(),
594 wsdl_path: "/wsdl".to_string(),
595 service_name: "Svc".to_string(),
596 port_name: "Port".to_string(),
597 address: Some("http://host:8080".to_string()),
598 keystore_path: Some("/ks.jks".to_string()),
599 keystore_password: Some("ks_pass".to_string()),
600 truststore_path: Some("/ts.jks".to_string()),
601 truststore_password: Some("ts_pass".to_string()),
602 sig_username: Some("user".to_string()),
603 sig_password: Some("sig_pass".to_string()),
604 enc_username: None,
605 security_actions_out: Some("Timestamp Signature".to_string()),
606 security_actions_in: Some("Timestamp".to_string()),
607 signature_algorithm: None,
608 signature_digest_algorithm: None,
609 signature_c14n_algorithm: None,
610 signature_parts: None,
611 };
612
613 let env = vars.to_env_vars();
614 assert_eq!(env.len(), 12);
616
617 let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
618 assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
619 assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
620 assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
621 assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
622 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
623 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
624 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
625 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
626 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
627 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
628 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
629 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
630 }
631}