Skip to main content

camel_bridge/
process.rs

1use std::path::PathBuf;
2use thiserror::Error;
3
4use crate::spec::{BridgeSpec, CXF_BRIDGE, JMS_BRIDGE, XML_BRIDGE};
5
6#[derive(Debug, Error)]
7pub enum BridgeError {
8    #[error("IO error: {0}")]
9    Io(#[from] std::io::Error),
10    #[error("Bridge timed out: {0}")]
11    Timeout(String),
12    #[error("Bridge stdout closed before ready message")]
13    StdoutClosed,
14    #[error("Bridge ready message malformed: {0}")]
15    BadReadyMessage(String),
16    #[error("Download failed: {0}")]
17    Download(String),
18    #[error("Checksum mismatch: expected {expected}, got {actual}")]
19    ChecksumMismatch { expected: String, actual: String },
20    #[error("URL not allowed: {0}")]
21    UrlNotAllowed(String),
22    #[error("Transport error: {0}")]
23    Transport(String),
24    #[error("Config error: {0}")]
25    Config(String),
26}
27
28#[derive(Debug, Clone, PartialEq, serde::Deserialize)]
29#[serde(rename_all = "lowercase")]
30pub enum BrokerType {
31    #[serde(alias = "active_mq")]
32    ActiveMq,
33    Artemis,
34    Generic,
35}
36
37impl BrokerType {
38    pub fn as_env_str(&self) -> &'static str {
39        match self {
40            BrokerType::ActiveMq => "activemq",
41            BrokerType::Artemis => "artemis",
42            BrokerType::Generic => "generic",
43        }
44    }
45}
46
47impl std::str::FromStr for BrokerType {
48    type Err = std::convert::Infallible;
49
50    fn from_str(s: &str) -> Result<Self, Self::Err> {
51        Ok(match s.to_lowercase().as_str() {
52            "activemq" => BrokerType::ActiveMq,
53            "artemis" => BrokerType::Artemis,
54            _ => BrokerType::Generic,
55        })
56    }
57}
58
59// Intentionally no Debug: contains keystore_password, truststore_password, sig_password. See BRG-004.
60#[allow(missing_debug_implementations)]
61/// Environment variables for a single CXF profile, used by the bridge Java side.
62pub struct CxfProfileEnvVars {
63    pub name: String,
64    pub wsdl_path: String,
65    pub service_name: String,
66    pub port_name: String,
67    pub address: Option<String>,
68    pub keystore_path: Option<String>,
69    pub keystore_password: Option<String>,
70    pub truststore_path: Option<String>,
71    pub truststore_password: Option<String>,
72    pub sig_username: Option<String>,
73    pub sig_password: Option<String>,
74    pub enc_username: Option<String>,
75    pub security_actions_out: Option<String>,
76    pub security_actions_in: Option<String>,
77    pub signature_algorithm: Option<String>,
78    pub signature_digest_algorithm: Option<String>,
79    pub signature_c14n_algorithm: Option<String>,
80    pub signature_parts: Option<String>,
81}
82
83impl CxfProfileEnvVars {
84    pub fn to_env_vars(&self) -> Vec<(String, String)> {
85        let prefix = format!("CXF_PROFILE_{}_", self.name.to_uppercase());
86        let mut vars = vec![
87            (format!("{}WSDL_PATH", prefix), self.wsdl_path.clone()),
88            (format!("{}SERVICE_NAME", prefix), self.service_name.clone()),
89            (format!("{}PORT_NAME", prefix), self.port_name.clone()),
90        ];
91
92        if let Some(ref v) = self.address {
93            vars.push((format!("{}ADDRESS", prefix), v.clone()));
94        }
95        if let Some(ref v) = self.keystore_path {
96            vars.push((format!("{}KEYSTORE_PATH", prefix), v.clone()));
97        }
98        if let Some(ref v) = self.keystore_password {
99            vars.push((format!("{}KEYSTORE_PASSWORD", prefix), v.clone()));
100        }
101        if let Some(ref v) = self.truststore_path {
102            vars.push((format!("{}TRUSTSTORE_PATH", prefix), v.clone()));
103        }
104        if let Some(ref v) = self.truststore_password {
105            vars.push((format!("{}TRUSTSTORE_PASSWORD", prefix), v.clone()));
106        }
107        if let Some(ref v) = self.sig_username {
108            vars.push((format!("{}SIG_USERNAME", prefix), v.clone()));
109        }
110        if let Some(ref v) = self.sig_password {
111            vars.push((format!("{}SIG_PASSWORD", prefix), v.clone()));
112        }
113        if let Some(ref v) = self.enc_username {
114            vars.push((format!("{}ENC_USERNAME", prefix), v.clone()));
115        }
116        if let Some(ref v) = self.security_actions_out {
117            vars.push((format!("{}SECURITY_ACTIONS_OUT", prefix), v.clone()));
118        }
119        if let Some(ref v) = self.security_actions_in {
120            vars.push((format!("{}SECURITY_ACTIONS_IN", prefix), v.clone()));
121        }
122        if let Some(ref v) = self.signature_algorithm {
123            vars.push((format!("{}SIGNATURE_ALGORITHM", prefix), v.clone()));
124        }
125        if let Some(ref v) = self.signature_digest_algorithm {
126            vars.push((format!("{}SIGNATURE_DIGEST_ALGORITHM", prefix), v.clone()));
127        }
128        if let Some(ref v) = self.signature_c14n_algorithm {
129            vars.push((format!("{}SIGNATURE_C14N_ALGORITHM", prefix), v.clone()));
130        }
131        if let Some(ref v) = self.signature_parts {
132            vars.push((format!("{}SIGNATURE_PARTS", prefix), v.clone()));
133        }
134
135        vars
136    }
137}
138
139// Intentionally no Debug: contains password field. See BRG-004.
140#[allow(missing_debug_implementations)]
141pub struct BridgeProcessConfig {
142    pub spec: &'static BridgeSpec,
143    pub binary_path: PathBuf,
144    pub broker_url: String,
145    pub broker_type: BrokerType,
146    pub username: Option<String>,
147    pub password: Option<String>,
148    pub start_timeout_ms: u64,
149    pub env_vars: Vec<(String, String)>,
150}
151
152impl BridgeProcessConfig {
153    /// Constructor for the JMS bridge.
154    pub fn jms(
155        binary_path: PathBuf,
156        broker_url: String,
157        broker_type: BrokerType,
158        username: Option<String>,
159        password: Option<String>,
160        start_timeout_ms: u64,
161    ) -> Self {
162        let mut env_vars = vec![
163            ("BRIDGE_BROKER_URL".to_string(), broker_url.clone()),
164            (
165                "BRIDGE_BROKER_TYPE".to_string(),
166                broker_type.as_env_str().to_string(),
167            ),
168        ];
169        if let Some(u) = &username {
170            env_vars.push(("BRIDGE_USERNAME".to_string(), u.clone()));
171        }
172        if let Some(p) = &password {
173            env_vars.push(("BRIDGE_PASSWORD".to_string(), p.clone()));
174        }
175        Self {
176            spec: &JMS_BRIDGE,
177            binary_path,
178            broker_url,
179            broker_type,
180            username,
181            password,
182            start_timeout_ms,
183            env_vars,
184        }
185    }
186
187    /// Constructor for the XML bridge.
188    pub fn xml(binary_path: PathBuf, start_timeout_ms: u64) -> Self {
189        Self {
190            spec: &XML_BRIDGE,
191            binary_path,
192            broker_url: String::new(),
193            broker_type: BrokerType::Generic,
194            username: None,
195            password: None,
196            start_timeout_ms,
197            env_vars: vec![],
198        }
199    }
200
201    /// Constructor for the CXF bridge with multi-profile support.
202    /// Generates `CXF_PROFILES=list` env var plus per-profile env vars.
203    pub fn cxf_profiles(
204        binary_path: PathBuf,
205        profiles: &[CxfProfileEnvVars],
206        start_timeout_ms: u64,
207    ) -> Self {
208        let profile_names: Vec<String> = profiles.iter().map(|p| p.name.clone()).collect();
209        let mut env_vars = vec![("CXF_PROFILES".to_string(), profile_names.join(","))];
210
211        for profile in profiles {
212            env_vars.extend(profile.to_env_vars());
213        }
214
215        Self {
216            spec: &CXF_BRIDGE,
217            binary_path,
218            broker_url: String::new(),
219            broker_type: BrokerType::Generic,
220            username: None,
221            password: None,
222            start_timeout_ms,
223            env_vars,
224        }
225    }
226
227    pub fn validate(&self) -> Result<(), String> {
228        if self.start_timeout_ms == 0 {
229            return Err("start_timeout_ms must be > 0".to_string());
230        }
231        Ok(())
232    }
233}
234
235pub struct BridgeProcess {
236    child: tokio::process::Child,
237    grpc_port: u16,
238}
239
240impl BridgeProcess {
241    pub fn grpc_port(&self) -> u16 {
242        self.grpc_port
243    }
244
245    /// Spawn the bridge process. Reads the gRPC port from stdout JSON line:
246    ///   {"status":"ready","port":PORT}
247    ///
248    /// Picks a free OS port and passes it to the bridge via `QUARKUS_GRPC_SERVER_PORT`
249    /// so Quarkus binds exactly to that port and PortAnnouncer can echo it back.
250    pub async fn start(config: &BridgeProcessConfig) -> Result<Self, BridgeError> {
251        use tokio::io::AsyncBufReadExt;
252        use tokio::process::Command;
253        use tokio::time::{Duration, timeout};
254
255        config.validate().map_err(BridgeError::Config)?;
256
257        // Bind :0 to let the OS pick a free port, then release so the bridge can use it.
258        let free_port = {
259            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
260            listener.local_addr()?.port()
261        };
262
263        // If CAMEL_BRIDGE_LOG_STDERR is set, redirect stderr to a file for debugging.
264        let stderr_stdio: std::process::Stdio =
265            if let Ok(log_dir) = std::env::var("CAMEL_BRIDGE_LOG_STDERR") {
266                let log_filename = config
267                    .spec
268                    .log_file_template
269                    .replace("{pid}", &std::process::id().to_string());
270                let log_path = if log_dir.is_empty() {
271                    format!("/tmp/{log_filename}")
272                } else {
273                    format!("{log_dir}/{log_filename}")
274                };
275                match std::fs::File::create(&log_path) {
276                    Ok(f) => {
277                        eprintln!("[camel-bridge] stderr → {}", log_path);
278                        f.into()
279                    }
280                    Err(e) => {
281                        eprintln!(
282                            "[camel-bridge] failed to create log file {}: {}",
283                            log_path, e
284                        );
285                        std::process::Stdio::inherit()
286                    }
287                }
288            } else {
289                std::process::Stdio::inherit()
290            };
291
292        let mut command = Command::new(&config.binary_path);
293        command
294            .env("QUARKUS_GRPC_SERVER_PORT", free_port.to_string())
295            // Let the OS pick a random HTTP port — we only use gRPC.
296            // Without this, Quarkus binds HTTP on 8080 and fails if occupied.
297            .env("QUARKUS_HTTP_PORT", "0")
298            .stdout(std::process::Stdio::piped())
299            .stderr(stderr_stdio);
300
301        // Inject bridge-specific env vars (e.g. JMS broker URL/credentials via ::jms()).
302        for (key, value) in &config.env_vars {
303            command.env(key, value);
304        }
305
306        let mut child = command.spawn()?;
307
308        let stdout = child.stdout.take().ok_or(BridgeError::StdoutClosed)?;
309        let mut reader = tokio::io::BufReader::new(stdout).lines();
310
311        let port = timeout(Duration::from_millis(config.start_timeout_ms), async {
312            while let Some(line) = reader.next_line().await? {
313                if let Ok(v) = serde_json::from_str::<serde_json::Value>(&line)
314                    && v.get("status").and_then(|s| s.as_str()) == Some("ready")
315                {
316                    if let Some(p) = v.get("port").and_then(|p| p.as_u64()) {
317                        return Ok(p as u16);
318                    }
319                    return Err(BridgeError::BadReadyMessage(line));
320                }
321            }
322            Err(BridgeError::StdoutClosed)
323        })
324        .await
325        .map_err(|_| {
326            BridgeError::Timeout(format!(
327                "{} failed to start: health check timeout after {}ms",
328                config.spec.name, config.start_timeout_ms
329            ))
330        })??;
331
332        // Keep draining the bridge's stdout in the background so the pipe
333        // buffer never fills up.  If the pipe blocks, the bridge process
334        // blocks on its next stdout write and silently stops responding.
335        tokio::spawn(async move {
336            while let Ok(Some(line)) = reader.next_line().await {
337                tracing::debug!(target: "camel_bridge::child", "{}", line);
338            }
339        });
340
341        Ok(BridgeProcess {
342            child,
343            grpc_port: port,
344        })
345    }
346
347    /// Gracefully stop: SIGTERM + wait for exit.
348    pub async fn stop(mut self) -> Result<(), BridgeError> {
349        use tokio::time::{Duration, sleep};
350
351        // Send SIGTERM first (graceful shutdown)
352        #[cfg(unix)]
353        {
354            let pid = self.child.id().unwrap_or(0);
355            if pid > 0 {
356                // SAFETY: libc::kill is called with the child process PID obtained from tokio.
357                unsafe {
358                    libc::kill(pid as i32, libc::SIGTERM);
359                }
360            }
361        }
362
363        // On non-Unix (Windows), fall through to kill immediately
364        #[cfg(not(unix))]
365        let _ = self.child.start_kill();
366
367        // Wait up to 5 seconds for graceful exit, then SIGKILL
368        tokio::select! {
369            result = self.child.wait() => {
370                result?;
371            }
372            _ = sleep(Duration::from_secs(5)) => {
373                let _ = self.child.start_kill();
374                self.child.wait().await?;
375            }
376        }
377        Ok(())
378    }
379}
380
381impl Drop for BridgeProcess {
382    fn drop(&mut self) {
383        // Best-effort only. Does NOT wait — cannot block in Drop.
384        let _ = self.child.start_kill();
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn broker_type_from_str_activemq() {
394        assert_eq!(
395            "activemq".parse::<BrokerType>().unwrap(),
396            BrokerType::ActiveMq
397        );
398        assert_eq!(
399            "ACTIVEMQ".parse::<BrokerType>().unwrap(),
400            BrokerType::ActiveMq
401        );
402    }
403
404    #[test]
405    fn broker_type_from_str_artemis() {
406        assert_eq!(
407            "artemis".parse::<BrokerType>().unwrap(),
408            BrokerType::Artemis
409        );
410    }
411
412    #[test]
413    fn broker_type_from_str_unknown_is_generic() {
414        assert_eq!("ibmmq".parse::<BrokerType>().unwrap(), BrokerType::Generic);
415    }
416
417    #[test]
418    fn broker_type_env_str() {
419        assert_eq!(BrokerType::ActiveMq.as_env_str(), "activemq");
420        assert_eq!(BrokerType::Artemis.as_env_str(), "artemis");
421        assert_eq!(BrokerType::Generic.as_env_str(), "generic");
422    }
423
424    #[test]
425    fn jms_constructor_uses_jms_spec() {
426        let cfg = BridgeProcessConfig::jms(
427            PathBuf::from("/tmp/jms-bridge"),
428            "tcp://localhost:61616".to_string(),
429            BrokerType::ActiveMq,
430            Some("user".to_string()),
431            Some("pass".to_string()),
432            1000,
433        );
434        assert_eq!(cfg.spec.name, "jms-bridge");
435    }
436
437    #[test]
438    fn xml_constructor_uses_xml_spec() {
439        let cfg = BridgeProcessConfig::xml(PathBuf::from("/tmp/xml-bridge"), 1000);
440        assert_eq!(cfg.spec.name, "xml-bridge");
441    }
442
443    #[test]
444    fn cxf_profiles_generates_cxf_profiles_env_var() {
445        let profiles = vec![
446            CxfProfileEnvVars {
447                name: "baleares".to_string(),
448                wsdl_path: "/a.wsdl".to_string(),
449                service_name: "Svc".to_string(),
450                port_name: "Port".to_string(),
451                address: None,
452                keystore_path: None,
453                keystore_password: None,
454                truststore_path: None,
455                truststore_password: None,
456                sig_username: None,
457                sig_password: None,
458                enc_username: None,
459                security_actions_out: None,
460                security_actions_in: None,
461                signature_algorithm: None,
462                signature_digest_algorithm: None,
463                signature_c14n_algorithm: None,
464                signature_parts: None,
465            },
466            CxfProfileEnvVars {
467                name: "extremadura".to_string(),
468                wsdl_path: "/b.wsdl".to_string(),
469                service_name: "Svc2".to_string(),
470                port_name: "Port2".to_string(),
471                address: Some("http://host:9090/ws".to_string()),
472                keystore_path: Some("/b.jks".to_string()),
473                keystore_password: Some("pass".to_string()),
474                truststore_path: None,
475                truststore_password: None,
476                sig_username: Some("cert".to_string()),
477                sig_password: Some("sig_pass".to_string()),
478                enc_username: None,
479                security_actions_out: Some("Timestamp Signature".to_string()),
480                security_actions_in: Some("Timestamp Signature".to_string()),
481                signature_algorithm: None,
482                signature_digest_algorithm: None,
483                signature_c14n_algorithm: None,
484                signature_parts: None,
485            },
486        ];
487
488        let cfg =
489            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
490
491        assert_eq!(cfg.spec.name, "cxf-bridge");
492        assert!(cfg.broker_url.is_empty());
493        assert_eq!(cfg.broker_type, BrokerType::Generic);
494        assert!(cfg.username.is_none());
495        assert!(cfg.password.is_none());
496
497        // Find CXF_PROFILES env var
498        let profiles_var = cfg
499            .env_vars
500            .iter()
501            .find(|(k, _)| k == "CXF_PROFILES")
502            .expect("CXF_PROFILES env var must exist");
503        assert_eq!(profiles_var.1, "baleares,extremadura");
504
505        // Check baleares profile vars (no security)
506        assert!(
507            cfg.env_vars
508                .iter()
509                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_WSDL_PATH" && v == "/a.wsdl")
510        );
511        assert!(
512            cfg.env_vars
513                .iter()
514                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_SERVICE_NAME" && v == "Svc")
515        );
516        assert!(
517            cfg.env_vars
518                .iter()
519                .any(|(k, v)| k == "CXF_PROFILE_BALEARES_PORT_NAME" && v == "Port")
520        );
521        assert!(
522            !cfg.env_vars
523                .iter()
524                .any(|(k, _)| k == "CXF_PROFILE_BALEARES_ADDRESS")
525        );
526
527        // Check extremadura profile vars (with security)
528        assert!(
529            cfg.env_vars
530                .iter()
531                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_WSDL_PATH" && v == "/b.wsdl")
532        );
533        assert!(
534            cfg.env_vars
535                .iter()
536                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_ADDRESS" && v == "http://host:9090/ws")
537        );
538        assert!(
539            cfg.env_vars
540                .iter()
541                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PATH" && v == "/b.jks")
542        );
543        assert!(
544            cfg.env_vars
545                .iter()
546                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_KEYSTORE_PASSWORD" && v == "pass")
547        );
548        assert!(
549            cfg.env_vars
550                .iter()
551                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_USERNAME" && v == "cert")
552        );
553        assert!(
554            cfg.env_vars
555                .iter()
556                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SIG_PASSWORD" && v == "sig_pass")
557        );
558        assert!(
559            cfg.env_vars
560                .iter()
561                .any(|(k, v)| k == "CXF_PROFILE_EXTREMADURA_SECURITY_ACTIONS_OUT"
562                    && v == "Timestamp Signature")
563        );
564    }
565
566    #[test]
567    fn cxf_profiles_single_profile_no_security() {
568        let profiles = vec![CxfProfileEnvVars {
569            name: "test".to_string(),
570            wsdl_path: "service.wsdl".to_string(),
571            service_name: "{http://example.com}Service".to_string(),
572            port_name: "{http://example.com}Port".to_string(),
573            address: None,
574            keystore_path: None,
575            keystore_password: None,
576            truststore_path: None,
577            truststore_password: None,
578            sig_username: None,
579            sig_password: None,
580            enc_username: None,
581            security_actions_out: None,
582            security_actions_in: None,
583            signature_algorithm: None,
584            signature_digest_algorithm: None,
585            signature_c14n_algorithm: None,
586            signature_parts: None,
587        }];
588
589        let cfg =
590            BridgeProcessConfig::cxf_profiles(PathBuf::from("/tmp/cxf-bridge"), &profiles, 15_000);
591
592        assert_eq!(cfg.spec.name, "cxf-bridge");
593        // CXF_PROFILES + 3 required vars (WSDL_PATH, SERVICE_NAME, PORT_NAME)
594        assert_eq!(cfg.env_vars.len(), 4);
595        assert_eq!(cfg.env_vars[0].0, "CXF_PROFILES");
596        assert_eq!(cfg.env_vars[0].1, "test");
597        assert_eq!(cfg.env_vars[1].0, "CXF_PROFILE_TEST_WSDL_PATH");
598        assert_eq!(cfg.env_vars[1].1, "service.wsdl");
599        assert_eq!(cfg.env_vars[2].0, "CXF_PROFILE_TEST_SERVICE_NAME");
600        assert_eq!(cfg.env_vars[2].1, "{http://example.com}Service");
601        assert_eq!(cfg.env_vars[3].0, "CXF_PROFILE_TEST_PORT_NAME");
602        assert_eq!(cfg.env_vars[3].1, "{http://example.com}Port");
603    }
604
605    #[test]
606    fn profile_env_vars_to_env_vars_includes_all_fields() {
607        let vars = CxfProfileEnvVars {
608            name: "full".to_string(),
609            wsdl_path: "/wsdl".to_string(),
610            service_name: "Svc".to_string(),
611            port_name: "Port".to_string(),
612            address: Some("http://host:8080".to_string()),
613            keystore_path: Some("/ks.jks".to_string()),
614            keystore_password: Some("ks_pass".to_string()),
615            truststore_path: Some("/ts.jks".to_string()),
616            truststore_password: Some("ts_pass".to_string()),
617            sig_username: Some("user".to_string()),
618            sig_password: Some("sig_pass".to_string()),
619            enc_username: None,
620            security_actions_out: Some("Timestamp Signature".to_string()),
621            security_actions_in: Some("Timestamp".to_string()),
622            signature_algorithm: None,
623            signature_digest_algorithm: None,
624            signature_c14n_algorithm: None,
625            signature_parts: None,
626        };
627
628        let env = vars.to_env_vars();
629        // 3 required + 1 address + 8 security = 12
630        assert_eq!(env.len(), 12);
631
632        let keys: Vec<&str> = env.iter().map(|(k, _)| k.as_str()).collect();
633        assert!(keys.contains(&"CXF_PROFILE_FULL_WSDL_PATH"));
634        assert!(keys.contains(&"CXF_PROFILE_FULL_SERVICE_NAME"));
635        assert!(keys.contains(&"CXF_PROFILE_FULL_PORT_NAME"));
636        assert!(keys.contains(&"CXF_PROFILE_FULL_ADDRESS"));
637        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PATH"));
638        assert!(keys.contains(&"CXF_PROFILE_FULL_KEYSTORE_PASSWORD"));
639        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PATH"));
640        assert!(keys.contains(&"CXF_PROFILE_FULL_TRUSTSTORE_PASSWORD"));
641        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_USERNAME"));
642        assert!(keys.contains(&"CXF_PROFILE_FULL_SIG_PASSWORD"));
643        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_OUT"));
644        assert!(keys.contains(&"CXF_PROFILE_FULL_SECURITY_ACTIONS_IN"));
645    }
646
647    #[test]
648    fn test_start_timeout_zero_rejected() {
649        let config = BridgeProcessConfig::jms(
650            PathBuf::from("/usr/bin/echo"),
651            "tcp://localhost:61616".to_string(),
652            BrokerType::ActiveMq,
653            None,
654            None,
655            0,
656        );
657        let result = config.validate();
658        assert!(result.is_err());
659    }
660}