Skip to main content

camel_bridge/
process.rs

1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8    #[error("IO error: {0}")]
9    Io(#[from] std::io::Error),
10    #[error("Bridge timed out: {0}")]
11    Timeout(String),
12    #[error("Bridge stdout closed before ready message")]
13    StdoutClosed,
14    #[error("Bridge ready message malformed: {0}")]
15    BadReadyMessage(String),
16    #[error("Download failed: {0}")]
17    Download(String),
18    #[error("Checksum mismatch: expected {expected}, got {actual}")]
19    ChecksumMismatch { expected: String, actual: String },
20    #[error("URL not allowed: {0}")]
21    UrlNotAllowed(String),
22    #[error("Transport error: {0}")]
23    Transport(String),
24}
25
26#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum BrokerType {
29    #[serde(alias = "active_mq")]
30    ActiveMq,
31    Artemis,
32    Generic,
33}
34
35impl BrokerType {
36    pub fn as_env_str(&self) -> &'static str {
37        match self {
38            BrokerType::ActiveMq => "activemq",
39            BrokerType::Artemis => "artemis",
40            BrokerType::Generic => "generic",
41        }
42    }
43}
44
45impl std::str::FromStr for BrokerType {
46    type Err = std::convert::Infallible;
47
48    fn from_str(s: &str) -> Result<Self, Self::Err> {
49        Ok(match s.to_lowercase().as_str() {
50            "activemq" => BrokerType::ActiveMq,
51            "artemis" => BrokerType::Artemis,
52            _ => BrokerType::Generic,
53        })
54    }
55}
56
57/// Environment variables for a single CXF profile, used by the bridge Java side.
58pub struct CxfProfileEnvVars {
59    pub name: String,
60    pub wsdl_path: String,
61    pub service_name: String,
62    pub port_name: String,
63    pub address: Option<String>,
64    pub keystore_path: Option<String>,
65    pub keystore_password: Option<String>,
66    pub truststore_path: Option<String>,
67    pub truststore_password: Option<String>,
68    pub sig_username: Option<String>,
69    pub sig_password: Option<String>,
70    pub enc_username: Option<String>,
71    pub security_actions_out: Option<String>,
72    pub security_actions_in: Option<String>,
73    pub signature_algorithm: Option<String>,
74    pub signature_digest_algorithm: Option<String>,
75    pub signature_c14n_algorithm: Option<String>,
76    pub signature_parts: Option<String>,
77}
78
79impl CxfProfileEnvVars {
80    pub fn to_env_vars(&self) -> Vec<(String, String)> {
81        let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
82        let mut vars = vec![
83            (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
84            (
85                format!("{}SERVICE_NAME", prefix),
86                self.service_name.clone(),
87            ),
88            (format!("{}PORT_NAME", prefix), self.port_name.clone()),
89        ];
90
91        if let Some(ref v) = self.address {
92            vars.push((format!("{}ADDRESS", prefix), v.clone()));
93        }
94        if let Some(ref v) = self.keystore_path {
95            vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
96        }
97        if let Some(ref v) = self.keystore_password {
98            vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
99        }
100        if let Some(ref v) = self.truststore_path {
101            vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
102        }
103        if let Some(ref v) = self.truststore_password {
104            vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
105        }
106        if let Some(ref v) = self.sig_username {
107            vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
108        }
109        if let Some(ref v) = self.sig_password {
110            vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
111        }
112        if let Some(ref v) = self.enc_username {
113            vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
114        }
115        if let Some(ref v) = self.security_actions_out {
116            vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
117        }
118        if let Some(ref v) = self.security_actions_in {
119            vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
120        }
121        if let Some(ref v) = self.signature_algorithm {
122            vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
123        }
124        if let Some(ref v) = self.signature_digest_algorithm {
125            vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
126        }
127        if let Some(ref v) = self.signature_c14n_algorithm {
128            vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
129        }
130        if let Some(ref v) = self.signature_parts {
131            vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
132        }
133
134        vars
135    }
136}
137
138pub struct BridgeProcessConfig {
139    pub spec: &'static BridgeSpec,
140    pub binary_path: PathBuf,
141    pub broker_url: String,
142    pub broker_type: BrokerType,
143    pub username: Option<String>,
144    pub password: Option<String>,
145    pub start_timeout_ms: u64,
146    pub env_vars: Vec<(String, String)>,
147}
148
149impl BridgeProcessConfig {
150    /// Constructor for the JMS bridge.
151    pub fn jms(
152        binary_path: PathBuf,
153        broker_url: String,
154        broker_type: BrokerType,
155        username: Option<String>,
156        password: Option<String>,
157        start_timeout_ms: u64,
158    ) -> Self {
159        let mut env_vars = vec![
160            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
161            (
162                "BRIDGE_BROKER_TYPE".to_string(),
163                broker_type.as_env_str().to_string(),
164            ),
165        ];
166        if let Some(u) = &username {
167            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
168        }
169        if let Some(p) = &password {
170            env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
171        }
172        Self {
173            spec: &JMS_BRIDGE,
174            binary_path,
175            broker_url,
176            broker_type,
177            username,
178            password,
179            start_timeout_ms,
180            env_vars,
181        }
182    }
183
184    /// Constructor for the XML bridge.
185    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
186        Self {
187            spec: &XML_BRIDGE,
188            binary_path,
189            broker_url: String::new(),
190            broker_type: BrokerType::Generic,
191            username: None,
192            password: None,
193            start_timeout_ms,
194            env_vars: vec![],
195        }
196    }
197
198    /// Constructor for the CXF bridge with multi-profile support.
199    /// Generates `CXF_PROFILES=list` env var plus per-profile env vars.
200    pub fn cxf_profiles(
201        binary_path: PathBuf,
202        profiles: &[CxfProfileEnvVars],
203        start_timeout_ms: u64,
204    ) -> Self {
205        let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
206        let mut env_vars = vec![(
207            "CXF_PROFILES".to_string(),
208            profile_names.join(","),
209        )];
210
211        for profile in profiles {
212            env_vars.extend(profile.to_env_vars());
213        }
214
215        Self {
216            spec: &CXF_BRIDGE,
217            binary_path,
218            broker_url: String::new(),
219            broker_type: BrokerType::Generic,
220            username: None,
221            password: None,
222            start_timeout_ms,
223            env_vars,
224        }
225    }
226}
227
228pub struct BridgeProcess {
229    child: tokio::process::Child,
230    grpc_port: u16,
231}
232
233impl BridgeProcess {
234    pub fn grpc_port(&self) -> u16 {
235        self.grpc_port
236    }
237
238    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
239    ///   {"status":"ready","port":PORT}
240    ///
241    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
242    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
243    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
244        use tokio::io::AsyncBufReadExt;
245        use tokio::process::Command;
246        use tokio::time::{Duration, timeout};
247
248        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
249        let free_port = {
250            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
251            listener.local_addr()?.port()
252        };
253
254        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
255        let stderr_stdio: std::process::Stdio =
256            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
257                let log_filename = config
258                    .spec
259                    .log_file_template
260                    .replace("{pid}", &std::process::id().to_string());
261                let log_path = if log_dir.is_empty() {
262                    format!("/tmp/{log_filename}")
263                } else {
264                    format!("{log_dir}/{log_filename}")
265                };
266                match std::fs::File::create(&log_path) {
267                    Ok(f) => {
268                        eprintln!("[camel-bridge] stderr → {}", log_path);
269                        f.into()
270                    }
271                    Err(e) => {
272                        eprintln!(
273                            "[camel-bridge] failed to create log file {}: {}",
274                            log_path, e
275                        );
276                        std::process::Stdio::inherit()
277                    }
278                }
279            } else {
280                std::process::Stdio::inherit()
281            };
282
283        let mut command = Command::new(&config.binary_path);
284        command
285            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
286            // Let the OS pick a random HTTP port — we only use gRPC.
287            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
288            .env("QUARKUS_HTTP_PORT", "0")
289            .stdout(std::process::Stdio::piped())
290            .stderr(stderr_stdio);
291
292        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
293        for (key, value) in &config.env_vars {
294            command.env(key, value);
295        }
296
297        let mut child = command.spawn()?;
298
299        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
300        let mut reader = tokio::io::BufReader::new(stdout).lines();
301
302        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
303            while let Some(line) = reader.next_line().await? {
304                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
305                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
306                {
307                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
308                        return Ok(p as u16);
309                    }
310                    return Err(BridgeError::BadReadyMessage(line));
311                }
312            }
313            Err(BridgeError::StdoutClosed)
314        })
315        .await
316        .map_err(|_| {
317            BridgeError::Timeout(format!(
318                "{} failed to start: health check timeout after {}ms",
319                config.spec.name, config.start_timeout_ms
320            ))
321        })??;
322
323        // Keep draining the bridge's stdout in the background so the pipe
324        // buffer never fills up.  If the pipe blocks, the bridge process
325        // blocks on its next stdout write and silently stops responding.
326        tokio::spawn(async move {
327            while let Ok(Some(line)) = reader.next_line().await {
328                tracing::debug!(target: "camel_bridge::child", "{}", line);
329            }
330        });
331
332        Ok(BridgeProcess {
333            child,
334            grpc_port: port,
335        })
336    }
337
338    /// Gracefully stop: SIGTERM + wait for exit.
339    pub async fn stop(mut self) -> Result<(), BridgeError> {
340        use tokio::time::{Duration, sleep};
341
342        // Send SIGTERM first (graceful shutdown)
343        #[cfg(unix)]
344        {
345            let pid = self.child.id().unwrap_or(0);
346            if pid > 0 {
347                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
348                unsafe {
349                    libc::kill(pid as i32, libc::SIGTERM);
350                }
351            }
352        }
353
354        // On non-Unix (Windows), fall through to kill immediately
355        #[cfg(not(unix))]
356        let _ = self.child.start_kill();
357
358        // Wait up to 5 seconds for graceful exit, then SIGKILL
359        tokio::select! {
360            result = self.child.wait() => {
361                result?;
362            }
363            _ = sleep(Duration::from_secs(5)) => {
364                let _ = self.child.start_kill();
365                self.child.wait().await?;
366            }
367        }
368        Ok(())
369    }
370}
371
372impl Drop for BridgeProcess {
373    fn drop(&mut self) {
374        // Best-effort only. Does NOT wait — cannot block in Drop.
375        let _ = self.child.start_kill();
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn broker_type_from_str_activemq() {
385        assert_eq!(
386            "activemq".parse::<BrokerType>().unwrap(),
387            BrokerType::ActiveMq
388        );
389        assert_eq!(
390            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
391            BrokerType::ActiveMq
392        );
393    }
394
395    #[test]
396    fn broker_type_from_str_artemis() {
397        assert_eq!(
398            "artemis".parse::<BrokerType>().unwrap(),
399            BrokerType::Artemis
400        );
401    }
402
403    #[test]
404    fn broker_type_from_str_unknown_is_generic() {
405        assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
406    }
407
408    #[test]
409    fn broker_type_env_str() {
410        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
411        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
412        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
413    }
414
415    #[test]
416    fn jms_constructor_uses_jms_spec() {
417        let cfg = BridgeProcessConfig::jms(
418            PathBuf::from("/tmp/jms-bridge"),
419            "tcp://localhost:61616".to_string(),
420            BrokerType::ActiveMq,
421            Some("user".to_string()),
422            Some("pass".to_string()),
423            1000,
424        );
425        assert_eq!(cfg.spec.name, "jms-bridge");
426    }
427
428    #[test]
429    fn xml_constructor_uses_xml_spec() {
430        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
431        assert_eq!(cfg.spec.name, "xml-bridge");
432    }
433
434    #[test]
435    fn cxf_profiles_generates_cxf_profiles_env_var() {
436        let profiles = vec![
437            CxfProfileEnvVars {
438                name: "baleares".to_string(),
439                wsdl_path: "/a.wsdl".to_string(),
440                service_name: "Svc".to_string(),
441                port_name: "Port".to_string(),
442                address: None,
443                keystore_path: None,
444                keystore_password: None,
445                truststore_path: None,
446                truststore_password: None,
447                sig_username: None,
448                sig_password: None,
449                enc_username: None,
450                security_actions_out: None,
451                security_actions_in: None,
452                signature_algorithm: None,
453                signature_digest_algorithm: None,
454                signature_c14n_algorithm: None,
455                signature_parts: None,
456            },
457            CxfProfileEnvVars {
458                name: "extremadura".to_string(),
459                wsdl_path: "/b.wsdl".to_string(),
460                service_name: "Svc2".to_string(),
461                port_name: "Port2".to_string(),
462                address: Some("http://host:9090/ws".to_string()),
463                keystore_path: Some("/b.jks".to_string()),
464                keystore_password: Some("pass".to_string()),
465                truststore_path: None,
466                truststore_password: None,
467                sig_username: Some("cert".to_string()),
468                sig_password: Some("sig_pass".to_string()),
469                enc_username: None,
470                security_actions_out: Some("Timestamp Signature".to_string()),
471                security_actions_in: Some("Timestamp Signature".to_string()),
472                signature_algorithm: None,
473                signature_digest_algorithm: None,
474                signature_c14n_algorithm: None,
475                signature_parts: None,
476            },
477        ];
478
479        let cfg =
480            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
481
482        assert_eq!(cfg.spec.name, "cxf-bridge");
483        assert!(cfg.broker_url.is_empty());
484        assert_eq!(cfg.broker_type, BrokerType::Generic);
485        assert!(cfg.username.is_none());
486        assert!(cfg.password.is_none());
487
488        // Find CXF_PROFILES env var
489        let profiles_var = cfg
490            .env_vars
491            .iter()
492            .find(|(k, _)| k == "CXF_PROFILES")
493            .expect("CXF_PROFILES env var must exist");
494        assert_eq!(profiles_var.1, "baleares,extremadura");
495
496        // Check baleares profile vars (no security)
497        assert!(cfg
498            .env_vars
499            .iter()
500            .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl"));
501        assert!(cfg
502            .env_vars
503            .iter()
504            .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc"));
505        assert!(cfg
506            .env_vars
507            .iter()
508            .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port"));
509        assert!(!cfg
510            .env_vars
511            .iter()
512            .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS"));
513
514        // Check extremadura profile vars (with security)
515        assert!(cfg
516            .env_vars
517            .iter()
518            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl"));
519        assert!(cfg
520            .env_vars
521            .iter()
522            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws"));
523        assert!(cfg
524            .env_vars
525            .iter()
526            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks"));
527        assert!(cfg
528            .env_vars
529            .iter()
530            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass"));
531        assert!(cfg
532            .env_vars
533            .iter()
534            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert"));
535        assert!(cfg
536            .env_vars
537            .iter()
538            .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass"));
539        assert!(cfg.env_vars.iter().any(
540           |(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
541                && v == "Timestamp Signature"
542        ));
543    }
544
545    #[test]
546    fn cxf_profiles_single_profile_no_security() {
547        let profiles = vec![CxfProfileEnvVars {
548            name: "test".to_string(),
549            wsdl_path: "service.wsdl".to_string(),
550            service_name: "{http://example.com}Service".to_string(),
551            port_name: "{http://example.com}Port".to_string(),
552            address: None,
553            keystore_path: None,
554            keystore_password: None,
555            truststore_path: None,
556            truststore_password: None,
557            sig_username: None,
558            sig_password: None,
559            enc_username: None,
560            security_actions_out: None,
561            security_actions_in: None,
562            signature_algorithm: None,
563            signature_digest_algorithm: None,
564            signature_c14n_algorithm: None,
565            signature_parts: None,
566        }];
567
568        let cfg =
569            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
570
571        assert_eq!(cfg.spec.name, "cxf-bridge");
572        // CXF_PROFILES + 3 required vars (WSDL_PATH, SERVICE_NAME, PORT_NAME)
573        assert_eq!(cfg.env_vars.len(), 4);
574        assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
575        assert_eq!(cfg.env_vars[0].1, "test");
576        assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
577        assert_eq!(cfg.env_vars[1].1, "service.wsdl");
578        assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
579        assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
580        assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
581        assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
582    }
583
584    #[test]
585    fn profile_env_vars_to_env_vars_includes_all_fields() {
586        let vars = CxfProfileEnvVars {
587            name: "full".to_string(),
588            wsdl_path: "/wsdl".to_string(),
589            service_name: "Svc".to_string(),
590            port_name: "Port".to_string(),
591            address: Some("http://host:8080".to_string()),
592            keystore_path: Some("/ks.jks".to_string()),
593            keystore_password: Some("ks_pass".to_string()),
594            truststore_path: Some("/ts.jks".to_string()),
595            truststore_password: Some("ts_pass".to_string()),
596            sig_username: Some("user".to_string()),
597            sig_password: Some("sig_pass".to_string()),
598            enc_username: None,
599            security_actions_out: Some("Timestamp Signature".to_string()),
600            security_actions_in: Some("Timestamp".to_string()),
601            signature_algorithm: None,
602            signature_digest_algorithm: None,
603            signature_c14n_algorithm: None,
604            signature_parts: None,
605        };
606
607        let env = vars.to_env_vars();
608        // 3 required + 1 address + 8 security = 12
609        assert_eq!(env.len(), 12);
610
611        let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
612        assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
613        assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
614        assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
615        assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
616        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
617        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
618        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
619        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
620        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
621        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
622        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
623        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
624    }
625}