1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8 #[error("IO error: {0}")]
9 Io(#[from] std::io::Error),
10 #[error("Bridge timed out: {0}")]
11 Timeout(String),
12 #[error("Bridge stdout closed before ready message")]
13 StdoutClosed,
14 #[error("Bridge ready message malformed: {0}")]
15 BadReadyMessage(String),
16 #[error("Download failed: {0}")]
17 Download(String),
18 #[error("Checksum mismatch: expected {expected}, got {actual}")]
19 ChecksumMismatch { expected: String, actual: String },
20 #[error("URL not allowed: {0}")]
21 UrlNotAllowed(String),
22 #[error("Transport error: {0}")]
23 Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29 #[serde(alias = "active_mq")]
30 ActiveMq,
31 Artemis,
32 Generic,
33}
34
35impl BrokerType {
36 pub fn as_env_str(&self) -> &'static str {
37 match self {
38 BrokerType::ActiveMq => "activemq",
39 BrokerType::Artemis => "artemis",
40 BrokerType::Generic => "generic",
41 }
42 }
43}
44
45impl std::str::FromStr for BrokerType {
46 type Err = std::convert::Infallible;
47
48 fn from_str(s: &str) -> Result<Self, Self::Err> {
49 Ok(match s.to_lowercase().as_str() {
50 "activemq" => BrokerType::ActiveMq,
51 "artemis" => BrokerType::Artemis,
52 _ => BrokerType::Generic,
53 })
54 }
55}
56
57pub struct CxfProfileEnvVars {
59 pub name: String,
60 pub wsdl_path: String,
61 pub service_name: String,
62 pub port_name: String,
63 pub address: Option<String>,
64 pub keystore_path: Option<String>,
65 pub keystore_password: Option<String>,
66 pub truststore_path: Option<String>,
67 pub truststore_password: Option<String>,
68 pub sig_username: Option<String>,
69 pub sig_password: Option<String>,
70 pub enc_username: Option<String>,
71 pub security_actions_out: Option<String>,
72 pub security_actions_in: Option<String>,
73 pub signature_algorithm: Option<String>,
74 pub signature_digest_algorithm: Option<String>,
75 pub signature_c14n_algorithm: Option<String>,
76 pub signature_parts: Option<String>,
77}
78
79impl CxfProfileEnvVars {
80 pub fn to_env_vars(&self) -> Vec<(String, String)> {
81 let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
82 let mut vars = vec![
83 (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
84 (
85 format!("{}SERVICE_NAME", prefix),
86 self.service_name.clone(),
87 ),
88 (format!("{}PORT_NAME", prefix), self.port_name.clone()),
89 ];
90
91 if let Some(ref v) = self.address {
92 vars.push((format!("{}ADDRESS", prefix), v.clone()));
93 }
94 if let Some(ref v) = self.keystore_path {
95 vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
96 }
97 if let Some(ref v) = self.keystore_password {
98 vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
99 }
100 if let Some(ref v) = self.truststore_path {
101 vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
102 }
103 if let Some(ref v) = self.truststore_password {
104 vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
105 }
106 if let Some(ref v) = self.sig_username {
107 vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
108 }
109 if let Some(ref v) = self.sig_password {
110 vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
111 }
112 if let Some(ref v) = self.enc_username {
113 vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
114 }
115 if let Some(ref v) = self.security_actions_out {
116 vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
117 }
118 if let Some(ref v) = self.security_actions_in {
119 vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
120 }
121 if let Some(ref v) = self.signature_algorithm {
122 vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
123 }
124 if let Some(ref v) = self.signature_digest_algorithm {
125 vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
126 }
127 if let Some(ref v) = self.signature_c14n_algorithm {
128 vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
129 }
130 if let Some(ref v) = self.signature_parts {
131 vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
132 }
133
134 vars
135 }
136}
137
138pub struct BridgeProcessConfig {
139 pub spec: &'static BridgeSpec,
140 pub binary_path: PathBuf,
141 pub broker_url: String,
142 pub broker_type: BrokerType,
143 pub username: Option<String>,
144 pub password: Option<String>,
145 pub start_timeout_ms: u64,
146 pub env_vars: Vec<(String, String)>,
147}
148
149impl BridgeProcessConfig {
150 pub fn jms(
152 binary_path: PathBuf,
153 broker_url: String,
154 broker_type: BrokerType,
155 username: Option<String>,
156 password: Option<String>,
157 start_timeout_ms: u64,
158 ) -> Self {
159 let mut env_vars = vec![
160 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
161 (
162 "BRIDGE_BROKER_TYPE".to_string(),
163 broker_type.as_env_str().to_string(),
164 ),
165 ];
166 if let Some(u) = &username {
167 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
168 }
169 if let Some(p) = &password {
170 env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
171 }
172 Self {
173 spec: &JMS_BRIDGE,
174 binary_path,
175 broker_url,
176 broker_type,
177 username,
178 password,
179 start_timeout_ms,
180 env_vars,
181 }
182 }
183
184 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
186 Self {
187 spec: &XML_BRIDGE,
188 binary_path,
189 broker_url: String::new(),
190 broker_type: BrokerType::Generic,
191 username: None,
192 password: None,
193 start_timeout_ms,
194 env_vars: vec![],
195 }
196 }
197
198 pub fn cxf_profiles(
201 binary_path: PathBuf,
202 profiles: &[CxfProfileEnvVars],
203 start_timeout_ms: u64,
204 ) -> Self {
205 let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
206 let mut env_vars = vec![(
207 "CXF_PROFILES".to_string(),
208 profile_names.join(","),
209 )];
210
211 for profile in profiles {
212 env_vars.extend(profile.to_env_vars());
213 }
214
215 Self {
216 spec: &CXF_BRIDGE,
217 binary_path,
218 broker_url: String::new(),
219 broker_type: BrokerType::Generic,
220 username: None,
221 password: None,
222 start_timeout_ms,
223 env_vars,
224 }
225 }
226}
227
228pub struct BridgeProcess {
229 child: tokio::process::Child,
230 grpc_port: u16,
231}
232
233impl BridgeProcess {
234 pub fn grpc_port(&self) -> u16 {
235 self.grpc_port
236 }
237
238 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
244 use tokio::io::AsyncBufReadExt;
245 use tokio::process::Command;
246 use tokio::time::{Duration, timeout};
247
248 let free_port = {
250 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
251 listener.local_addr()?.port()
252 };
253
254 let stderr_stdio: std::process::Stdio =
256 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
257 let log_filename = config
258 .spec
259 .log_file_template
260 .replace("{pid}", &std::process::id().to_string());
261 let log_path = if log_dir.is_empty() {
262 format!("/tmp/{log_filename}")
263 } else {
264 format!("{log_dir}/{log_filename}")
265 };
266 match std::fs::File::create(&log_path) {
267 Ok(f) => {
268 eprintln!("[camel-bridge] stderr → {}", log_path);
269 f.into()
270 }
271 Err(e) => {
272 eprintln!(
273 "[camel-bridge] failed to create log file {}: {}",
274 log_path, e
275 );
276 std::process::Stdio::inherit()
277 }
278 }
279 } else {
280 std::process::Stdio::inherit()
281 };
282
283 let mut command = Command::new(&config.binary_path);
284 command
285 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
286 .env("QUARKUS_HTTP_PORT", "0")
289 .stdout(std::process::Stdio::piped())
290 .stderr(stderr_stdio);
291
292 for (key, value) in &config.env_vars {
294 command.env(key, value);
295 }
296
297 let mut child = command.spawn()?;
298
299 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
300 let mut reader = tokio::io::BufReader::new(stdout).lines();
301
302 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
303 while let Some(line) = reader.next_line().await? {
304 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
305 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
306 {
307 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
308 return Ok(p as u16);
309 }
310 return Err(BridgeError::BadReadyMessage(line));
311 }
312 }
313 Err(BridgeError::StdoutClosed)
314 })
315 .await
316 .map_err(|_| {
317 BridgeError::Timeout(format!(
318 "{} failed to start: health check timeout after {}ms",
319 config.spec.name, config.start_timeout_ms
320 ))
321 })??;
322
323 tokio::spawn(async move {
327 while let Ok(Some(line)) = reader.next_line().await {
328 tracing::debug!(target: "camel_bridge::child", "{}", line);
329 }
330 });
331
332 Ok(BridgeProcess {
333 child,
334 grpc_port: port,
335 })
336 }
337
338 pub async fn stop(mut self) -> Result<(), BridgeError> {
340 use tokio::time::{Duration, sleep};
341
342 #[cfg(unix)]
344 {
345 let pid = self.child.id().unwrap_or(0);
346 if pid > 0 {
347 unsafe {
349 libc::kill(pid as i32, libc::SIGTERM);
350 }
351 }
352 }
353
354 #[cfg(not(unix))]
356 let _ = self.child.start_kill();
357
358 tokio::select! {
360 result = self.child.wait() => {
361 result?;
362 }
363 _ = sleep(Duration::from_secs(5)) => {
364 let _ = self.child.start_kill();
365 self.child.wait().await?;
366 }
367 }
368 Ok(())
369 }
370}
371
372impl Drop for BridgeProcess {
373 fn drop(&mut self) {
374 let _ = self.child.start_kill();
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn broker_type_from_str_activemq() {
385 assert_eq!(
386 "activemq".parse::<BrokerType>().unwrap(),
387 BrokerType::ActiveMq
388 );
389 assert_eq!(
390 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
391 BrokerType::ActiveMq
392 );
393 }
394
395 #[test]
396 fn broker_type_from_str_artemis() {
397 assert_eq!(
398 "artemis".parse::<BrokerType>().unwrap(),
399 BrokerType::Artemis
400 );
401 }
402
403 #[test]
404 fn broker_type_from_str_unknown_is_generic() {
405 assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
406 }
407
408 #[test]
409 fn broker_type_env_str() {
410 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
411 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
412 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
413 }
414
415 #[test]
416 fn jms_constructor_uses_jms_spec() {
417 let cfg = BridgeProcessConfig::jms(
418 PathBuf::from("/tmp/jms-bridge"),
419 "tcp://localhost:61616".to_string(),
420 BrokerType::ActiveMq,
421 Some("user".to_string()),
422 Some("pass".to_string()),
423 1000,
424 );
425 assert_eq!(cfg.spec.name, "jms-bridge");
426 }
427
428 #[test]
429 fn xml_constructor_uses_xml_spec() {
430 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
431 assert_eq!(cfg.spec.name, "xml-bridge");
432 }
433
434 #[test]
435 fn cxf_profiles_generates_cxf_profiles_env_var() {
436 let profiles = vec![
437 CxfProfileEnvVars {
438 name: "baleares".to_string(),
439 wsdl_path: "/a.wsdl".to_string(),
440 service_name: "Svc".to_string(),
441 port_name: "Port".to_string(),
442 address: None,
443 keystore_path: None,
444 keystore_password: None,
445 truststore_path: None,
446 truststore_password: None,
447 sig_username: None,
448 sig_password: None,
449 enc_username: None,
450 security_actions_out: None,
451 security_actions_in: None,
452 signature_algorithm: None,
453 signature_digest_algorithm: None,
454 signature_c14n_algorithm: None,
455 signature_parts: None,
456 },
457 CxfProfileEnvVars {
458 name: "extremadura".to_string(),
459 wsdl_path: "/b.wsdl".to_string(),
460 service_name: "Svc2".to_string(),
461 port_name: "Port2".to_string(),
462 address: Some("http://host:9090/ws".to_string()),
463 keystore_path: Some("/b.jks".to_string()),
464 keystore_password: Some("pass".to_string()),
465 truststore_path: None,
466 truststore_password: None,
467 sig_username: Some("cert".to_string()),
468 sig_password: Some("sig_pass".to_string()),
469 enc_username: None,
470 security_actions_out: Some("Timestamp Signature".to_string()),
471 security_actions_in: Some("Timestamp Signature".to_string()),
472 signature_algorithm: None,
473 signature_digest_algorithm: None,
474 signature_c14n_algorithm: None,
475 signature_parts: None,
476 },
477 ];
478
479 let cfg =
480 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
481
482 assert_eq!(cfg.spec.name, "cxf-bridge");
483 assert!(cfg.broker_url.is_empty());
484 assert_eq!(cfg.broker_type, BrokerType::Generic);
485 assert!(cfg.username.is_none());
486 assert!(cfg.password.is_none());
487
488 let profiles_var = cfg
490 .env_vars
491 .iter()
492 .find(|(k, _)| k == "CXF_PROFILES")
493 .expect("CXF_PROFILES env var must exist");
494 assert_eq!(profiles_var.1, "baleares,extremadura");
495
496 assert!(cfg
498 .env_vars
499 .iter()
500 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl"));
501 assert!(cfg
502 .env_vars
503 .iter()
504 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc"));
505 assert!(cfg
506 .env_vars
507 .iter()
508 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port"));
509 assert!(!cfg
510 .env_vars
511 .iter()
512 .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS"));
513
514 assert!(cfg
516 .env_vars
517 .iter()
518 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl"));
519 assert!(cfg
520 .env_vars
521 .iter()
522 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws"));
523 assert!(cfg
524 .env_vars
525 .iter()
526 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks"));
527 assert!(cfg
528 .env_vars
529 .iter()
530 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass"));
531 assert!(cfg
532 .env_vars
533 .iter()
534 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert"));
535 assert!(cfg
536 .env_vars
537 .iter()
538 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass"));
539 assert!(cfg.env_vars.iter().any(
540 |(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
541 && v == "Timestamp Signature"
542 ));
543 }
544
545 #[test]
546 fn cxf_profiles_single_profile_no_security() {
547 let profiles = vec![CxfProfileEnvVars {
548 name: "test".to_string(),
549 wsdl_path: "service.wsdl".to_string(),
550 service_name: "{http://example.com}Service".to_string(),
551 port_name: "{http://example.com}Port".to_string(),
552 address: None,
553 keystore_path: None,
554 keystore_password: None,
555 truststore_path: None,
556 truststore_password: None,
557 sig_username: None,
558 sig_password: None,
559 enc_username: None,
560 security_actions_out: None,
561 security_actions_in: None,
562 signature_algorithm: None,
563 signature_digest_algorithm: None,
564 signature_c14n_algorithm: None,
565 signature_parts: None,
566 }];
567
568 let cfg =
569 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
570
571 assert_eq!(cfg.spec.name, "cxf-bridge");
572 assert_eq!(cfg.env_vars.len(), 4);
574 assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
575 assert_eq!(cfg.env_vars[0].1, "test");
576 assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
577 assert_eq!(cfg.env_vars[1].1, "service.wsdl");
578 assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
579 assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
580 assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
581 assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
582 }
583
584 #[test]
585 fn profile_env_vars_to_env_vars_includes_all_fields() {
586 let vars = CxfProfileEnvVars {
587 name: "full".to_string(),
588 wsdl_path: "/wsdl".to_string(),
589 service_name: "Svc".to_string(),
590 port_name: "Port".to_string(),
591 address: Some("http://host:8080".to_string()),
592 keystore_path: Some("/ks.jks".to_string()),
593 keystore_password: Some("ks_pass".to_string()),
594 truststore_path: Some("/ts.jks".to_string()),
595 truststore_password: Some("ts_pass".to_string()),
596 sig_username: Some("user".to_string()),
597 sig_password: Some("sig_pass".to_string()),
598 enc_username: None,
599 security_actions_out: Some("Timestamp Signature".to_string()),
600 security_actions_in: Some("Timestamp".to_string()),
601 signature_algorithm: None,
602 signature_digest_algorithm: None,
603 signature_c14n_algorithm: None,
604 signature_parts: None,
605 };
606
607 let env = vars.to_env_vars();
608 assert_eq!(env.len(), 12);
610
611 let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
612 assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
613 assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
614 assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
615 assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
616 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
617 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
618 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
619 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
620 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
621 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
622 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
623 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
624 }
625}