1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8 #[error("IO error: {0}")]
9 Io(#[from] std::io::Error),
10 #[error("Bridge timed out: {0}")]
11 Timeout(String),
12 #[error("Bridge stdout closed before ready message")]
13 StdoutClosed,
14 #[error("Bridge ready message malformed: {0}")]
15 BadReadyMessage(String),
16 #[error("Download failed: {0}")]
17 Download(String),
18 #[error("Checksum mismatch: expected {expected}, got {actual}")]
19 ChecksumMismatch { expected: String, actual: String },
20 #[error("URL not allowed: {0}")]
21 UrlNotAllowed(String),
22 #[error("Transport error: {0}")]
23 Transport(String),
24 #[error("Config error: {0}")]
25 Config(String),
26}
27
28#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum BrokerType {
31 #[serde(alias = "active_mq")]
32 ActiveMq,
33 Artemis,
34 Generic,
35}
36
37impl BrokerType {
38 pub fn as_env_str(&self) -> &'static str {
39 match self {
40 BrokerType::ActiveMq => "activemq",
41 BrokerType::Artemis => "artemis",
42 BrokerType::Generic => "generic",
43 }
44 }
45}
46
47impl std::str::FromStr for BrokerType {
48 type Err = std::convert::Infallible;
49
50 fn from_str(s: &str) -> Result<Self, Self::Err> {
51 Ok(match s.to_lowercase().as_str() {
52 "activemq" => BrokerType::ActiveMq,
53 "artemis" => BrokerType::Artemis,
54 _ => BrokerType::Generic,
55 })
56 }
57}
58
59#[allow(missing_debug_implementations)]
61pub struct CxfProfileEnvVars {
63 pub name: String,
64 pub wsdl_path: String,
65 pub service_name: String,
66 pub port_name: String,
67 pub address: Option<String>,
68 pub keystore_path: Option<String>,
69 pub keystore_password: Option<String>,
70 pub truststore_path: Option<String>,
71 pub truststore_password: Option<String>,
72 pub sig_username: Option<String>,
73 pub sig_password: Option<String>,
74 pub enc_username: Option<String>,
75 pub security_actions_out: Option<String>,
76 pub security_actions_in: Option<String>,
77 pub signature_algorithm: Option<String>,
78 pub signature_digest_algorithm: Option<String>,
79 pub signature_c14n_algorithm: Option<String>,
80 pub signature_parts: Option<String>,
81}
82
83impl CxfProfileEnvVars {
84 pub fn to_env_vars(&self) -> Vec<(String, String)> {
85 let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
86 let mut vars = vec![
87 (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
88 (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
89 (format!("{}PORT_NAME", prefix), self.port_name.clone()),
90 ];
91
92 if let Some(ref v) = self.address {
93 vars.push((format!("{}ADDRESS", prefix), v.clone()));
94 }
95 if let Some(ref v) = self.keystore_path {
96 vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
97 }
98 if let Some(ref v) = self.keystore_password {
99 vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
100 }
101 if let Some(ref v) = self.truststore_path {
102 vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
103 }
104 if let Some(ref v) = self.truststore_password {
105 vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
106 }
107 if let Some(ref v) = self.sig_username {
108 vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
109 }
110 if let Some(ref v) = self.sig_password {
111 vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
112 }
113 if let Some(ref v) = self.enc_username {
114 vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
115 }
116 if let Some(ref v) = self.security_actions_out {
117 vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
118 }
119 if let Some(ref v) = self.security_actions_in {
120 vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
121 }
122 if let Some(ref v) = self.signature_algorithm {
123 vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
124 }
125 if let Some(ref v) = self.signature_digest_algorithm {
126 vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
127 }
128 if let Some(ref v) = self.signature_c14n_algorithm {
129 vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
130 }
131 if let Some(ref v) = self.signature_parts {
132 vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
133 }
134
135 vars
136 }
137}
138
139#[allow(missing_debug_implementations)]
141pub struct BridgeProcessConfig {
142 pub spec: &'static BridgeSpec,
143 pub binary_path: PathBuf,
144 pub broker_url: String,
145 pub broker_type: BrokerType,
146 pub username: Option<String>,
147 pub password: Option<String>,
148 pub start_timeout_ms: u64,
149 pub env_vars: Vec<(String, String)>,
150}
151
152impl BridgeProcessConfig {
153 pub fn jms(
155 binary_path: PathBuf,
156 broker_url: String,
157 broker_type: BrokerType,
158 username: Option<String>,
159 password: Option<String>,
160 start_timeout_ms: u64,
161 ) -> Self {
162 let mut env_vars = vec![
163 ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
164 (
165 "BRIDGE_BROKER_TYPE".to_string(),
166 broker_type.as_env_str().to_string(),
167 ),
168 ];
169 if let Some(u) = &username {
170 env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
171 }
172 if let Some(p) = &password {
173 env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
174 }
175 Self {
176 spec: &JMS_BRIDGE,
177 binary_path,
178 broker_url,
179 broker_type,
180 username,
181 password,
182 start_timeout_ms,
183 env_vars,
184 }
185 }
186
187 pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
189 Self {
190 spec: &XML_BRIDGE,
191 binary_path,
192 broker_url: String::new(),
193 broker_type: BrokerType::Generic,
194 username: None,
195 password: None,
196 start_timeout_ms,
197 env_vars: vec![],
198 }
199 }
200
201 pub fn cxf_profiles(
204 binary_path: PathBuf,
205 profiles: &[CxfProfileEnvVars],
206 start_timeout_ms: u64,
207 ) -> Self {
208 let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
209 let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
210
211 for profile in profiles {
212 env_vars.extend(profile.to_env_vars());
213 }
214
215 Self {
216 spec: &CXF_BRIDGE,
217 binary_path,
218 broker_url: String::new(),
219 broker_type: BrokerType::Generic,
220 username: None,
221 password: None,
222 start_timeout_ms,
223 env_vars,
224 }
225 }
226
227 pub fn validate(&self) -> Result<(), String> {
228 if self.start_timeout_ms == 0 {
229 return Err("start_timeout_ms must be > 0".to_string());
230 }
231 Ok(())
232 }
233}
234
235pub struct BridgeProcess {
236 child: tokio::process::Child,
237 grpc_port: u16,
238}
239
240impl BridgeProcess {
241 pub fn grpc_port(&self) -> u16 {
242 self.grpc_port
243 }
244
245 pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
251 use tokio::io::AsyncBufReadExt;
252 use tokio::process::Command;
253 use tokio::time::{Duration, timeout};
254
255 config.validate().map_err(BridgeError::Config)?;
256
257 let free_port = {
259 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
260 listener.local_addr()?.port()
261 };
262
263 let stderr_stdio: std::process::Stdio =
265 if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
266 let log_filename = config
267 .spec
268 .log_file_template
269 .replace("{pid}", &std::process::id().to_string());
270 let log_path = if log_dir.is_empty() {
271 format!("/tmp/{log_filename}")
272 } else {
273 format!("{log_dir}/{log_filename}")
274 };
275 match std::fs::File::create(&log_path) {
276 Ok(f) => {
277 eprintln!("[camel-bridge] stderr → {}", log_path);
278 f.into()
279 }
280 Err(e) => {
281 eprintln!(
282 "[camel-bridge] failed to create log file {}: {}",
283 log_path, e
284 );
285 std::process::Stdio::inherit()
286 }
287 }
288 } else {
289 std::process::Stdio::inherit()
290 };
291
292 let mut command = Command::new(&config.binary_path);
293 command
294 .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
295 .env("QUARKUS_HTTP_PORT", "0")
298 .stdout(std::process::Stdio::piped())
299 .stderr(stderr_stdio);
300
301 for (key, value) in &config.env_vars {
303 command.env(key, value);
304 }
305
306 let mut child = command.spawn()?;
307
308 let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
309 let mut reader = tokio::io::BufReader::new(stdout).lines();
310
311 let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
312 while let Some(line) = reader.next_line().await? {
313 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
314 && v.get("status").and_then(|s| s.as_str()) == Some("ready")
315 {
316 if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
317 return Ok(p as u16);
318 }
319 return Err(BridgeError::BadReadyMessage(line));
320 }
321 }
322 Err(BridgeError::StdoutClosed)
323 })
324 .await
325 .map_err(|_| {
326 BridgeError::Timeout(format!(
327 "{} failed to start: health check timeout after {}ms",
328 config.spec.name, config.start_timeout_ms
329 ))
330 })??;
331
332 tokio::spawn(async move {
336 while let Ok(Some(line)) = reader.next_line().await {
337 tracing::debug!(target: "camel_bridge::child", "{}", line);
338 }
339 });
340
341 Ok(BridgeProcess {
342 child,
343 grpc_port: port,
344 })
345 }
346
347 pub async fn stop(mut self) -> Result<(), BridgeError> {
349 use tokio::time::{Duration, sleep};
350
351 #[cfg(unix)]
353 {
354 let pid = self.child.id().unwrap_or(0);
355 if pid > 0 {
356 unsafe {
358 libc::kill(pid as i32, libc::SIGTERM);
359 }
360 }
361 }
362
363 #[cfg(not(unix))]
365 let _ = self.child.start_kill();
366
367 tokio::select! {
369 result = self.child.wait() => {
370 result?;
371 }
372 _ = sleep(Duration::from_secs(5)) => {
373 let _ = self.child.start_kill();
374 self.child.wait().await?;
375 }
376 }
377 Ok(())
378 }
379}
380
381impl Drop for BridgeProcess {
382 fn drop(&mut self) {
383 let _ = self.child.start_kill();
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn broker_type_from_str_activemq() {
394 assert_eq!(
395 "activemq".parse::<BrokerType>().unwrap(),
396 BrokerType::ActiveMq
397 );
398 assert_eq!(
399 "ACTIVEMQ".parse::<BrokerType>().unwrap(),
400 BrokerType::ActiveMq
401 );
402 }
403
404 #[test]
405 fn broker_type_from_str_artemis() {
406 assert_eq!(
407 "artemis".parse::<BrokerType>().unwrap(),
408 BrokerType::Artemis
409 );
410 }
411
412 #[test]
413 fn broker_type_from_str_unknown_is_generic() {
414 assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
415 }
416
417 #[test]
418 fn broker_type_env_str() {
419 assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
420 assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
421 assert_eq!(BrokerType::Generic.as_env_str(), "generic");
422 }
423
424 #[test]
425 fn jms_constructor_uses_jms_spec() {
426 let cfg = BridgeProcessConfig::jms(
427 PathBuf::from("/tmp/jms-bridge"),
428 "tcp://localhost:61616".to_string(),
429 BrokerType::ActiveMq,
430 Some("user".to_string()),
431 Some("pass".to_string()),
432 1000,
433 );
434 assert_eq!(cfg.spec.name, "jms-bridge");
435 }
436
437 #[test]
438 fn xml_constructor_uses_xml_spec() {
439 let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
440 assert_eq!(cfg.spec.name, "xml-bridge");
441 }
442
443 #[test]
444 fn cxf_profiles_generates_cxf_profiles_env_var() {
445 let profiles = vec![
446 CxfProfileEnvVars {
447 name: "baleares".to_string(),
448 wsdl_path: "/a.wsdl".to_string(),
449 service_name: "Svc".to_string(),
450 port_name: "Port".to_string(),
451 address: None,
452 keystore_path: None,
453 keystore_password: None,
454 truststore_path: None,
455 truststore_password: None,
456 sig_username: None,
457 sig_password: None,
458 enc_username: None,
459 security_actions_out: None,
460 security_actions_in: None,
461 signature_algorithm: None,
462 signature_digest_algorithm: None,
463 signature_c14n_algorithm: None,
464 signature_parts: None,
465 },
466 CxfProfileEnvVars {
467 name: "extremadura".to_string(),
468 wsdl_path: "/b.wsdl".to_string(),
469 service_name: "Svc2".to_string(),
470 port_name: "Port2".to_string(),
471 address: Some("http://host:9090/ws".to_string()),
472 keystore_path: Some("/b.jks".to_string()),
473 keystore_password: Some("pass".to_string()),
474 truststore_path: None,
475 truststore_password: None,
476 sig_username: Some("cert".to_string()),
477 sig_password: Some("sig_pass".to_string()),
478 enc_username: None,
479 security_actions_out: Some("Timestamp Signature".to_string()),
480 security_actions_in: Some("Timestamp Signature".to_string()),
481 signature_algorithm: None,
482 signature_digest_algorithm: None,
483 signature_c14n_algorithm: None,
484 signature_parts: None,
485 },
486 ];
487
488 let cfg =
489 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
490
491 assert_eq!(cfg.spec.name, "cxf-bridge");
492 assert!(cfg.broker_url.is_empty());
493 assert_eq!(cfg.broker_type, BrokerType::Generic);
494 assert!(cfg.username.is_none());
495 assert!(cfg.password.is_none());
496
497 let profiles_var = cfg
499 .env_vars
500 .iter()
501 .find(|(k, _)| k == "CXF_PROFILES")
502 .expect("CXF_PROFILES env var must exist");
503 assert_eq!(profiles_var.1, "baleares,extremadura");
504
505 assert!(
507 cfg.env_vars
508 .iter()
509 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
510 );
511 assert!(
512 cfg.env_vars
513 .iter()
514 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
515 );
516 assert!(
517 cfg.env_vars
518 .iter()
519 .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
520 );
521 assert!(
522 !cfg.env_vars
523 .iter()
524 .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
525 );
526
527 assert!(
529 cfg.env_vars
530 .iter()
531 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
532 );
533 assert!(
534 cfg.env_vars
535 .iter()
536 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
537 );
538 assert!(
539 cfg.env_vars
540 .iter()
541 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
542 );
543 assert!(
544 cfg.env_vars
545 .iter()
546 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
547 );
548 assert!(
549 cfg.env_vars
550 .iter()
551 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
552 );
553 assert!(
554 cfg.env_vars
555 .iter()
556 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
557 );
558 assert!(
559 cfg.env_vars
560 .iter()
561 .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
562 && v == "Timestamp Signature")
563 );
564 }
565
566 #[test]
567 fn cxf_profiles_single_profile_no_security() {
568 let profiles = vec![CxfProfileEnvVars {
569 name: "test".to_string(),
570 wsdl_path: "service.wsdl".to_string(),
571 service_name: "{http://example.com}Service".to_string(),
572 port_name: "{http://example.com}Port".to_string(),
573 address: None,
574 keystore_path: None,
575 keystore_password: None,
576 truststore_path: None,
577 truststore_password: None,
578 sig_username: None,
579 sig_password: None,
580 enc_username: None,
581 security_actions_out: None,
582 security_actions_in: None,
583 signature_algorithm: None,
584 signature_digest_algorithm: None,
585 signature_c14n_algorithm: None,
586 signature_parts: None,
587 }];
588
589 let cfg =
590 BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
591
592 assert_eq!(cfg.spec.name, "cxf-bridge");
593 assert_eq!(cfg.env_vars.len(), 4);
595 assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
596 assert_eq!(cfg.env_vars[0].1, "test");
597 assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
598 assert_eq!(cfg.env_vars[1].1, "service.wsdl");
599 assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
600 assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
601 assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
602 assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
603 }
604
605 #[test]
606 fn profile_env_vars_to_env_vars_includes_all_fields() {
607 let vars = CxfProfileEnvVars {
608 name: "full".to_string(),
609 wsdl_path: "/wsdl".to_string(),
610 service_name: "Svc".to_string(),
611 port_name: "Port".to_string(),
612 address: Some("http://host:8080".to_string()),
613 keystore_path: Some("/ks.jks".to_string()),
614 keystore_password: Some("ks_pass".to_string()),
615 truststore_path: Some("/ts.jks".to_string()),
616 truststore_password: Some("ts_pass".to_string()),
617 sig_username: Some("user".to_string()),
618 sig_password: Some("sig_pass".to_string()),
619 enc_username: None,
620 security_actions_out: Some("Timestamp Signature".to_string()),
621 security_actions_in: Some("Timestamp".to_string()),
622 signature_algorithm: None,
623 signature_digest_algorithm: None,
624 signature_c14n_algorithm: None,
625 signature_parts: None,
626 };
627
628 let env = vars.to_env_vars();
629 assert_eq!(env.len(), 12);
631
632 let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
633 assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
634 assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
635 assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
636 assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
637 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
638 assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
639 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
640 assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
641 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
642 assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
643 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
644 assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
645 }
646
647 #[test]
648 fn test_start_timeout_zero_rejected() {
649 let config = BridgeProcessConfig::jms(
650 PathBuf::from("/usr/bin/echo"),
651 "tcp://localhost:61616".to_string(),
652 BrokerType::ActiveMq,
653 None,
654 None,
655 0,
656 );
657 let result = config.validate();
658 assert!(result.is_err());
659 }
660}