Skip to main content

camel_bridge/
process.rs

1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8    #[error("IO error: {0}")]
9    Io(#[from] std::io::Error),
10    #[error("Bridge timed out: {0}")]
11    Timeout(String),
12    #[error("Bridge stdout closed before ready message")]
13    StdoutClosed,
14    #[error("Bridge ready message malformed: {0}")]
15    BadReadyMessage(String),
16    #[error("Download failed: {0}")]
17    Download(String),
18    #[error("Checksum mismatch: expected {expected}, got {actual}")]
19    ChecksumMismatch { expected: String, actual: String },
20    #[error("URL not allowed: {0}")]
21    UrlNotAllowed(String),
22    #[error("Transport error: {0}")]
23    Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29    #[serde(alias = "active_mq")]
30    ActiveMq,
31    Artemis,
32    Generic,
33}
34
35impl BrokerType {
36    pub fn as_env_str(&self) -> &'static str {
37        match self {
38            BrokerType::ActiveMq => "activemq",
39            BrokerType::Artemis => "artemis",
40            BrokerType::Generic => "generic",
41        }
42    }
43}
44
45impl std::str::FromStr for BrokerType {
46    type Err = std::convert::Infallible;
47
48    fn from_str(s: &str) -> Result<Self, Self::Err> {
49        Ok(match s.to_lowercase().as_str() {
50            "activemq" => BrokerType::ActiveMq,
51            "artemis" => BrokerType::Artemis,
52            _ => BrokerType::Generic,
53        })
54    }
55}
56
57/// Environment variables for a single CXF profile, used by the bridge Java side.
58pub struct CxfProfileEnvVars {
59    pub name: String,
60    pub wsdl_path: String,
61    pub service_name: String,
62    pub port_name: String,
63    pub address: Option<String>,
64    pub keystore_path: Option<String>,
65    pub keystore_password: Option<String>,
66    pub truststore_path: Option<String>,
67    pub truststore_password: Option<String>,
68    pub sig_username: Option<String>,
69    pub sig_password: Option<String>,
70    pub enc_username: Option<String>,
71    pub security_actions_out: Option<String>,
72    pub security_actions_in: Option<String>,
73    pub signature_algorithm: Option<String>,
74    pub signature_digest_algorithm: Option<String>,
75    pub signature_c14n_algorithm: Option<String>,
76    pub signature_parts: Option<String>,
77}
78
79impl CxfProfileEnvVars {
80    pub fn to_env_vars(&self) -> Vec<(String, String)> {
81        let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
82        let mut vars = vec![
83            (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
84            (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
85            (format!("{}PORT_NAME", prefix), self.port_name.clone()),
86        ];
87
88        if let Some(ref v) = self.address {
89            vars.push((format!("{}ADDRESS", prefix), v.clone()));
90        }
91        if let Some(ref v) = self.keystore_path {
92            vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
93        }
94        if let Some(ref v) = self.keystore_password {
95            vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
96        }
97        if let Some(ref v) = self.truststore_path {
98            vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
99        }
100        if let Some(ref v) = self.truststore_password {
101            vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
102        }
103        if let Some(ref v) = self.sig_username {
104            vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
105        }
106        if let Some(ref v) = self.sig_password {
107            vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
108        }
109        if let Some(ref v) = self.enc_username {
110            vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
111        }
112        if let Some(ref v) = self.security_actions_out {
113            vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
114        }
115        if let Some(ref v) = self.security_actions_in {
116            vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
117        }
118        if let Some(ref v) = self.signature_algorithm {
119            vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
120        }
121        if let Some(ref v) = self.signature_digest_algorithm {
122            vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
123        }
124        if let Some(ref v) = self.signature_c14n_algorithm {
125            vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
126        }
127        if let Some(ref v) = self.signature_parts {
128            vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
129        }
130
131        vars
132    }
133}
134
135pub struct BridgeProcessConfig {
136    pub spec: &'static BridgeSpec,
137    pub binary_path: PathBuf,
138    pub broker_url: String,
139    pub broker_type: BrokerType,
140    pub username: Option<String>,
141    pub password: Option<String>,
142    pub start_timeout_ms: u64,
143    pub env_vars: Vec<(String, String)>,
144}
145
146impl BridgeProcessConfig {
147    /// Constructor for the JMS bridge.
148    pub fn jms(
149        binary_path: PathBuf,
150        broker_url: String,
151        broker_type: BrokerType,
152        username: Option<String>,
153        password: Option<String>,
154        start_timeout_ms: u64,
155    ) -> Self {
156        let mut env_vars = vec![
157            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
158            (
159                "BRIDGE_BROKER_TYPE".to_string(),
160                broker_type.as_env_str().to_string(),
161            ),
162        ];
163        if let Some(u) = &username {
164            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
165        }
166        if let Some(p) = &password {
167            env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
168        }
169        Self {
170            spec: &JMS_BRIDGE,
171            binary_path,
172            broker_url,
173            broker_type,
174            username,
175            password,
176            start_timeout_ms,
177            env_vars,
178        }
179    }
180
181    /// Constructor for the XML bridge.
182    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
183        Self {
184            spec: &XML_BRIDGE,
185            binary_path,
186            broker_url: String::new(),
187            broker_type: BrokerType::Generic,
188            username: None,
189            password: None,
190            start_timeout_ms,
191            env_vars: vec![],
192        }
193    }
194
195    /// Constructor for the CXF bridge with multi-profile support.
196    /// Generates `CXF_PROFILES=list` env var plus per-profile env vars.
197    pub fn cxf_profiles(
198        binary_path: PathBuf,
199        profiles: &[CxfProfileEnvVars],
200        start_timeout_ms: u64,
201    ) -> Self {
202        let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
203        let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
204
205        for profile in profiles {
206            env_vars.extend(profile.to_env_vars());
207        }
208
209        Self {
210            spec: &CXF_BRIDGE,
211            binary_path,
212            broker_url: String::new(),
213            broker_type: BrokerType::Generic,
214            username: None,
215            password: None,
216            start_timeout_ms,
217            env_vars,
218        }
219    }
220}
221
222pub struct BridgeProcess {
223    child: tokio::process::Child,
224    grpc_port: u16,
225}
226
227impl BridgeProcess {
228    pub fn grpc_port(&self) -> u16 {
229        self.grpc_port
230    }
231
232    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
233    ///   {"status":"ready","port":PORT}
234    ///
235    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
236    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
237    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
238        use tokio::io::AsyncBufReadExt;
239        use tokio::process::Command;
240        use tokio::time::{Duration, timeout};
241
242        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
243        let free_port = {
244            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
245            listener.local_addr()?.port()
246        };
247
248        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
249        let stderr_stdio: std::process::Stdio =
250            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
251                let log_filename = config
252                    .spec
253                    .log_file_template
254                    .replace("{pid}", &std::process::id().to_string());
255                let log_path = if log_dir.is_empty() {
256                    format!("/tmp/{log_filename}")
257                } else {
258                    format!("{log_dir}/{log_filename}")
259                };
260                match std::fs::File::create(&log_path) {
261                    Ok(f) => {
262                        eprintln!("[camel-bridge] stderr → {}", log_path);
263                        f.into()
264                    }
265                    Err(e) => {
266                        eprintln!(
267                            "[camel-bridge] failed to create log file {}: {}",
268                            log_path, e
269                        );
270                        std::process::Stdio::inherit()
271                    }
272                }
273            } else {
274                std::process::Stdio::inherit()
275            };
276
277        let mut command = Command::new(&config.binary_path);
278        command
279            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
280            // Let the OS pick a random HTTP port — we only use gRPC.
281            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
282            .env("QUARKUS_HTTP_PORT", "0")
283            .stdout(std::process::Stdio::piped())
284            .stderr(stderr_stdio);
285
286        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
287        for (key, value) in &config.env_vars {
288            command.env(key, value);
289        }
290
291        let mut child = command.spawn()?;
292
293        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
294        let mut reader = tokio::io::BufReader::new(stdout).lines();
295
296        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
297            while let Some(line) = reader.next_line().await? {
298                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
299                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
300                {
301                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
302                        return Ok(p as u16);
303                    }
304                    return Err(BridgeError::BadReadyMessage(line));
305                }
306            }
307            Err(BridgeError::StdoutClosed)
308        })
309        .await
310        .map_err(|_| {
311            BridgeError::Timeout(format!(
312                "{} failed to start: health check timeout after {}ms",
313                config.spec.name, config.start_timeout_ms
314            ))
315        })??;
316
317        // Keep draining the bridge's stdout in the background so the pipe
318        // buffer never fills up.  If the pipe blocks, the bridge process
319        // blocks on its next stdout write and silently stops responding.
320        tokio::spawn(async move {
321            while let Ok(Some(line)) = reader.next_line().await {
322                tracing::debug!(target: "camel_bridge::child", "{}", line);
323            }
324        });
325
326        Ok(BridgeProcess {
327            child,
328            grpc_port: port,
329        })
330    }
331
332    /// Gracefully stop: SIGTERM + wait for exit.
333    pub async fn stop(mut self) -> Result<(), BridgeError> {
334        use tokio::time::{Duration, sleep};
335
336        // Send SIGTERM first (graceful shutdown)
337        #[cfg(unix)]
338        {
339            let pid = self.child.id().unwrap_or(0);
340            if pid > 0 {
341                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
342                unsafe {
343                    libc::kill(pid as i32, libc::SIGTERM);
344                }
345            }
346        }
347
348        // On non-Unix (Windows), fall through to kill immediately
349        #[cfg(not(unix))]
350        let _ = self.child.start_kill();
351
352        // Wait up to 5 seconds for graceful exit, then SIGKILL
353        tokio::select! {
354            result = self.child.wait() => {
355                result?;
356            }
357            _ = sleep(Duration::from_secs(5)) => {
358                let _ = self.child.start_kill();
359                self.child.wait().await?;
360            }
361        }
362        Ok(())
363    }
364}
365
366impl Drop for BridgeProcess {
367    fn drop(&mut self) {
368        // Best-effort only. Does NOT wait — cannot block in Drop.
369        let _ = self.child.start_kill();
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn broker_type_from_str_activemq() {
379        assert_eq!(
380            "activemq".parse::<BrokerType>().unwrap(),
381            BrokerType::ActiveMq
382        );
383        assert_eq!(
384            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
385            BrokerType::ActiveMq
386        );
387    }
388
389    #[test]
390    fn broker_type_from_str_artemis() {
391        assert_eq!(
392            "artemis".parse::<BrokerType>().unwrap(),
393            BrokerType::Artemis
394        );
395    }
396
397    #[test]
398    fn broker_type_from_str_unknown_is_generic() {
399        assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
400    }
401
402    #[test]
403    fn broker_type_env_str() {
404        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
405        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
406        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
407    }
408
409    #[test]
410    fn jms_constructor_uses_jms_spec() {
411        let cfg = BridgeProcessConfig::jms(
412            PathBuf::from("/tmp/jms-bridge"),
413            "tcp://localhost:61616".to_string(),
414            BrokerType::ActiveMq,
415            Some("user".to_string()),
416            Some("pass".to_string()),
417            1000,
418        );
419        assert_eq!(cfg.spec.name, "jms-bridge");
420    }
421
422    #[test]
423    fn xml_constructor_uses_xml_spec() {
424        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
425        assert_eq!(cfg.spec.name, "xml-bridge");
426    }
427
428    #[test]
429    fn cxf_profiles_generates_cxf_profiles_env_var() {
430        let profiles = vec![
431            CxfProfileEnvVars {
432                name: "baleares".to_string(),
433                wsdl_path: "/a.wsdl".to_string(),
434                service_name: "Svc".to_string(),
435                port_name: "Port".to_string(),
436                address: None,
437                keystore_path: None,
438                keystore_password: None,
439                truststore_path: None,
440                truststore_password: None,
441                sig_username: None,
442                sig_password: None,
443                enc_username: None,
444                security_actions_out: None,
445                security_actions_in: None,
446                signature_algorithm: None,
447                signature_digest_algorithm: None,
448                signature_c14n_algorithm: None,
449                signature_parts: None,
450            },
451            CxfProfileEnvVars {
452                name: "extremadura".to_string(),
453                wsdl_path: "/b.wsdl".to_string(),
454                service_name: "Svc2".to_string(),
455                port_name: "Port2".to_string(),
456                address: Some("http://host:9090/ws".to_string()),
457                keystore_path: Some("/b.jks".to_string()),
458                keystore_password: Some("pass".to_string()),
459                truststore_path: None,
460                truststore_password: None,
461                sig_username: Some("cert".to_string()),
462                sig_password: Some("sig_pass".to_string()),
463                enc_username: None,
464                security_actions_out: Some("Timestamp Signature".to_string()),
465                security_actions_in: Some("Timestamp Signature".to_string()),
466                signature_algorithm: None,
467                signature_digest_algorithm: None,
468                signature_c14n_algorithm: None,
469                signature_parts: None,
470            },
471        ];
472
473        let cfg =
474            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
475
476        assert_eq!(cfg.spec.name, "cxf-bridge");
477        assert!(cfg.broker_url.is_empty());
478        assert_eq!(cfg.broker_type, BrokerType::Generic);
479        assert!(cfg.username.is_none());
480        assert!(cfg.password.is_none());
481
482        // Find CXF_PROFILES env var
483        let profiles_var = cfg
484            .env_vars
485            .iter()
486            .find(|(k, _)| k == "CXF_PROFILES")
487            .expect("CXF_PROFILES env var must exist");
488        assert_eq!(profiles_var.1, "baleares,extremadura");
489
490        // Check baleares profile vars (no security)
491        assert!(
492            cfg.env_vars
493                .iter()
494                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
495        );
496        assert!(
497            cfg.env_vars
498                .iter()
499                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
500        );
501        assert!(
502            cfg.env_vars
503                .iter()
504                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
505        );
506        assert!(
507            !cfg.env_vars
508                .iter()
509                .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
510        );
511
512        // Check extremadura profile vars (with security)
513        assert!(
514            cfg.env_vars
515                .iter()
516                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
517        );
518        assert!(
519            cfg.env_vars
520                .iter()
521                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
522        );
523        assert!(
524            cfg.env_vars
525                .iter()
526                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
527        );
528        assert!(
529            cfg.env_vars
530                .iter()
531                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
532        );
533        assert!(
534            cfg.env_vars
535                .iter()
536                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
537        );
538        assert!(
539            cfg.env_vars
540                .iter()
541                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
542        );
543        assert!(
544            cfg.env_vars
545                .iter()
546                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
547                    && v == "Timestamp Signature")
548        );
549    }
550
551    #[test]
552    fn cxf_profiles_single_profile_no_security() {
553        let profiles = vec![CxfProfileEnvVars {
554            name: "test".to_string(),
555            wsdl_path: "service.wsdl".to_string(),
556            service_name: "{http://example.com}Service".to_string(),
557            port_name: "{http://example.com}Port".to_string(),
558            address: None,
559            keystore_path: None,
560            keystore_password: None,
561            truststore_path: None,
562            truststore_password: None,
563            sig_username: None,
564            sig_password: None,
565            enc_username: None,
566            security_actions_out: None,
567            security_actions_in: None,
568            signature_algorithm: None,
569            signature_digest_algorithm: None,
570            signature_c14n_algorithm: None,
571            signature_parts: None,
572        }];
573
574        let cfg =
575            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
576
577        assert_eq!(cfg.spec.name, "cxf-bridge");
578        // CXF_PROFILES + 3 required vars (WSDL_PATH, SERVICE_NAME, PORT_NAME)
579        assert_eq!(cfg.env_vars.len(), 4);
580        assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
581        assert_eq!(cfg.env_vars[0].1, "test");
582        assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
583        assert_eq!(cfg.env_vars[1].1, "service.wsdl");
584        assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
585        assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
586        assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
587        assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
588    }
589
590    #[test]
591    fn profile_env_vars_to_env_vars_includes_all_fields() {
592        let vars = CxfProfileEnvVars {
593            name: "full".to_string(),
594            wsdl_path: "/wsdl".to_string(),
595            service_name: "Svc".to_string(),
596            port_name: "Port".to_string(),
597            address: Some("http://host:8080".to_string()),
598            keystore_path: Some("/ks.jks".to_string()),
599            keystore_password: Some("ks_pass".to_string()),
600            truststore_path: Some("/ts.jks".to_string()),
601            truststore_password: Some("ts_pass".to_string()),
602            sig_username: Some("user".to_string()),
603            sig_password: Some("sig_pass".to_string()),
604            enc_username: None,
605            security_actions_out: Some("Timestamp Signature".to_string()),
606            security_actions_in: Some("Timestamp".to_string()),
607            signature_algorithm: None,
608            signature_digest_algorithm: None,
609            signature_c14n_algorithm: None,
610            signature_parts: None,
611        };
612
613        let env = vars.to_env_vars();
614        // 3 required + 1 address + 8 security = 12
615        assert_eq!(env.len(), 12);
616
617        let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
618        assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
619        assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
620        assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
621        assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
622        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
623        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
624        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
625        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
626        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
627        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
628        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
629        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
630    }
631}